diff --git a/src/obicompactvec/src/bitmatrix.rs b/src/obicompactvec/src/bitmatrix.rs index 631db63..ca1b393 100644 --- a/src/obicompactvec/src/bitmatrix.rs +++ b/src/obicompactvec/src/bitmatrix.rs @@ -1,5 +1,5 @@ use std::fs::{self, File}; -use std::io::{self, Write as _}; +use std::io::{self, BufWriter, Write as _}; use std::path::{Path, PathBuf}; use memmap2::Mmap; @@ -230,30 +230,47 @@ impl PackedBitMatrix { /// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files. pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> { + let packed_path = dir.join("matrix.pbmx"); + if packed_path.exists() { + // Matrix complete; remove any leftover column files from a killed cleanup. + if let Ok(meta) = MatrixMeta::load(dir) { + for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); } + let _ = fs::remove_file(dir.join("meta.json")); + } + return Ok(()); + } + let meta = MatrixMeta::load(dir)?; let n_cols = meta.n_cols; - let col_files: Vec> = (0..n_cols) - .map(|c| fs::read(col_path(dir, c))) + // Compute offsets from file sizes — no column data loaded into RAM. + let col_sizes: Vec = (0..n_cols) + .map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len())) .collect::>()?; - let header_size = PBMX_HEADER + n_cols * 8; + let header_size = (PBMX_HEADER + n_cols * 8) as u64; let mut col_offset = header_size; let mut offsets = Vec::with_capacity(n_cols); - for data in &col_files { - offsets.push(col_offset as u64); - col_offset += data.len(); + for &size in &col_sizes { + offsets.push(col_offset); + col_offset += size; } - let packed_path = dir.join("matrix.pbmx"); - let mut file = File::create(&packed_path)?; - file.write_all(&PBMX_MAGIC)?; - file.write_all(&[0u8; 4])?; - file.write_all(&(meta.n as u64).to_le_bytes())?; - file.write_all(&(n_cols as u64).to_le_bytes())?; - for &off in &offsets { file.write_all(&off.to_le_bytes())?; } - for data in &col_files { file.write_all(data)?; } - drop(file); + // Write to a temp file; rename atomically so a killed process never leaves + // a truncated matrix.pbmx that would be mistaken for a complete file. + let tmp_path = dir.join("matrix.pbmx.tmp"); + let mut out = BufWriter::new(File::create(&tmp_path)?); + out.write_all(&PBMX_MAGIC)?; + out.write_all(&[0u8; 4])?; + out.write_all(&(meta.n as u64).to_le_bytes())?; + out.write_all(&(n_cols as u64).to_le_bytes())?; + for &off in &offsets { out.write_all(&off.to_le_bytes())?; } + for c in 0..n_cols { + io::copy(&mut File::open(col_path(dir, c))?, &mut out)?; + } + out.flush()?; + drop(out); + fs::rename(&tmp_path, &packed_path)?; for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; } fs::remove_file(dir.join("meta.json"))?; diff --git a/src/obicompactvec/src/intmatrix.rs b/src/obicompactvec/src/intmatrix.rs index 8db78da..b563335 100644 --- a/src/obicompactvec/src/intmatrix.rs +++ b/src/obicompactvec/src/intmatrix.rs @@ -1,6 +1,6 @@ use std::cmp::Ordering; use std::fs::{self, File}; -use std::io::{self, Write as _}; +use std::io::{self, BufWriter, Write as _}; use std::path::{Path, PathBuf}; use memmap2::Mmap; @@ -354,30 +354,47 @@ impl PackedCompactIntMatrix { /// Build `counts/matrix.pcmx` from existing `col_*.pciv` files. pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> { + let packed_path = dir.join("matrix.pcmx"); + if packed_path.exists() { + // Matrix complete; remove any leftover column files from a killed cleanup. + if let Ok(meta) = MatrixMeta::load(dir) { + for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); } + let _ = fs::remove_file(dir.join("meta.json")); + } + return Ok(()); + } + let meta = MatrixMeta::load(dir)?; let n_cols = meta.n_cols; - let col_files: Vec> = (0..n_cols) - .map(|c| fs::read(col_path(dir, c))) + // Compute offsets from file sizes — no column data loaded into RAM. + let col_sizes: Vec = (0..n_cols) + .map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len())) .collect::>()?; - let header_size = PCMX_HEADER + n_cols * 8; + let header_size = (PCMX_HEADER + n_cols * 8) as u64; let mut col_offset = header_size; let mut offsets = Vec::with_capacity(n_cols); - for data in &col_files { - offsets.push(col_offset as u64); - col_offset += data.len(); + for &size in &col_sizes { + offsets.push(col_offset); + col_offset += size; } - let packed_path = dir.join("matrix.pcmx"); - let mut file = File::create(&packed_path)?; - file.write_all(&PCMX_MAGIC)?; - file.write_all(&[0u8; 4])?; - file.write_all(&(meta.n as u64).to_le_bytes())?; - file.write_all(&(n_cols as u64).to_le_bytes())?; - for &off in &offsets { file.write_all(&off.to_le_bytes())?; } - for data in &col_files { file.write_all(data)?; } - drop(file); + // Write to a temp file; rename atomically so a killed process never leaves + // a truncated matrix.pcmx that would be mistaken for a complete file. + let tmp_path = dir.join("matrix.pcmx.tmp"); + let mut out = BufWriter::new(File::create(&tmp_path)?); + out.write_all(&PCMX_MAGIC)?; + out.write_all(&[0u8; 4])?; + out.write_all(&(meta.n as u64).to_le_bytes())?; + out.write_all(&(n_cols as u64).to_le_bytes())?; + for &off in &offsets { out.write_all(&off.to_le_bytes())?; } + for c in 0..n_cols { + io::copy(&mut File::open(col_path(dir, c))?, &mut out)?; + } + out.flush()?; + drop(out); + fs::rename(&tmp_path, &packed_path)?; for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; } fs::remove_file(dir.join("meta.json"))?; diff --git a/src/obikindex/src/index.rs b/src/obikindex/src/index.rs index 2c58aed..353c39a 100644 --- a/src/obikindex/src/index.rs +++ b/src/obikindex/src/index.rs @@ -1,8 +1,6 @@ use std::collections::BTreeMap; use std::fs; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; use obikpartitionner::{KmerPartition, KmerSpectrum}; use obilayeredmap; @@ -152,31 +150,25 @@ impl KmerIndex { let with_counts = self.meta.config.with_counts; let evidence = self.meta.config.evidence.clone(); let block_bits = self.meta.config.block_bits; - let total_kmers = AtomicUsize::new(0); + let mut total_kmers: usize = 0; + let pb = progress_bar("index", n as u64, "partitions"); - let pb = Arc::new(Mutex::new(progress_bar("index", n as u64, "partitions"))); - - (0..n).into_par_iter().for_each(|i| { - match self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits) { - Ok(0) => {} - Ok(n_kmers) => { - total_kmers.fetch_add(n_kmers, Ordering::Relaxed); - let pb = pb.lock().unwrap(); + let order: Vec = (0..n).collect(); + let runner = crate::numa::PartitionRunner::new(); + runner.run( + &order, + |i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits), + |i, n_kmers, _| { + if n_kmers > 0 { + total_kmers += n_kmers; pb.inc(1); pb.set_message(format!("{i}: {n_kmers} kmers")); } - Err(e) => { - eprintln!("error building layer for partition {i}: {e}"); - std::process::exit(1); - } - } - }); + }, + ).map_err(OKIError::Partition)?; - pb.lock().unwrap().finish_and_clear(); - info!( - "done — {} total kmers indexed", - total_kmers.load(Ordering::Relaxed) - ); + pb.finish_and_clear(); + info!("done — {} total kmers indexed", total_kmers); if !keep_intermediate { for i in 0..n { @@ -211,36 +203,25 @@ impl KmerIndex { use obilayeredmap::meta::PartitionMeta; let n = self.n_partitions(); - let errors: Vec<_> = (0..n) - .into_par_iter() - .filter_map(|i| { + let order: Vec = (0..n).collect(); + crate::numa::PartitionRunner::new().run( + &order, + |i| -> OKIResult<()> { let index_dir = self.partition.part_dir(i).join("index"); - if !index_dir.exists() { return None; } - let meta = match PartitionMeta::load(&index_dir) { - Ok(m) => m, - Err(e) => return Some(OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))), - }; + if !index_dir.exists() { return Ok(()); } + let meta = PartitionMeta::load(&index_dir) + .map_err(|e| OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())))?; for l in 0..meta.n_layers { let layer_dir = index_dir.join(format!("layer_{l}")); let presence_dir = layer_dir.join("presence"); let counts_dir = layer_dir.join("counts"); - if presence_dir.exists() { - if let Err(e) = pack_bit_matrix(&presence_dir) { - return Some(OKIError::Io(e)); - } - } - if counts_dir.exists() { - if let Err(e) = pack_compact_int_matrix(&counts_dir) { - return Some(OKIError::Io(e)); - } - } + if presence_dir.exists() { pack_bit_matrix(&presence_dir).map_err(OKIError::Io)?; } + if counts_dir.exists() { pack_compact_int_matrix(&counts_dir).map_err(OKIError::Io)?; } } - None - }) - .collect(); - - if let Some(e) = errors.into_iter().next() { return Err(e); } - Ok(()) + Ok(()) + }, + |_, _, _| {}, + ) } /// Write a `layer_meta.json` in any layer directory that is missing one. diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index dde62b7..4c12013 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -1,4 +1,4 @@ -// NUMA-aware Rayon thread pools via hwlocality. +// NUMA-aware partition runner via hwlocality. // // Detects NUMA topology using hwloc (cross-platform: Linux, macOS, etc.) and // builds one Rayon ThreadPool per NUMA node with threads pinned to that node's @@ -10,15 +10,14 @@ // - the system has only one NUMA node (UMA, Apple Silicon, single-socket) // - any per-node pool fails to build -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::{Duration, Instant}; -use crossbeam_channel::{RecvTimeoutError, unbounded}; +use crossbeam_channel::unbounded; use hwlocality::Topology; use hwlocality::cpu::binding::CpuBindingFlags; use hwlocality::cpu::cpuset::CpuSet; use hwlocality::object::types::ObjectType; -use obisys::CpuSample; use tracing::debug; // ── Public interface ────────────────────────────────────────────────────────── @@ -104,27 +103,6 @@ fn build_pool(cpus: &[usize]) -> Option { .ok() } -// ── Adaptive spawn heuristic ────────────────────────────────────────────────── -// -// First worker: spawn if CPU efficiency is below SPAWN_THRESHOLD (machine is -// under-utilised). Subsequent workers: spawn only if the last worker raised -// efficiency by at least the expected marginal gain (1/n_workers), with a -// minimum floor to avoid spurious spawns from efficiency fluctuations. - -const SPAWN_THRESHOLD: f64 = 0.95; -const MIN_MARGINAL_GAIN: f64 = 0.03; -const SPAWN_POLL: Duration = Duration::from_secs(20); - -fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool { - if n_workers == 1 { - eff < SPAWN_THRESHOLD - } else { - let gain = eff - eff_at_last_spawn; - let expected = 1.0 / n_workers as f64; - gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN) - } -} - // ── PartitionRunner ─────────────────────────────────────────────────────────── struct NodeConfig { @@ -135,42 +113,38 @@ struct NodeConfig { /// Generic NUMA-aware runner for partition-level parallel work. /// -/// Encapsulates worker spawning, NUMA pinning, adaptive activation, and result -/// collection. UMA systems are handled as the degenerate case of a single node -/// with no pinning. +/// Workers are distributed round-robin across NUMA nodes and pinned to their +/// node's CPUs. UMA systems are the degenerate case: one node, no pinning. /// -/// # Model +/// # Termination /// -/// One controller thread per NUMA node (one total on UMA). Each controller -/// manages up to `max_workers` dormant workers that drain a shared work queue. -/// Workers are activated one at a time; a new worker is added when global CPU -/// efficiency justifies it. On NUMA all workers are activated immediately -/// (memory bandwidth, not CPU count, is the bottleneck). +/// Termination is driven entirely by channel closure: +/// +/// ```text +/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx +/// drop(result_tx) → result_rx closes → controller loop exits +/// ``` +/// +/// No explicit counter or sentinel needed. pub struct PartitionRunner { - nodes: Vec, - n_cores: usize, + nodes: Vec, } impl PartitionRunner { - /// Detect topology and build. Falls back to a single-node UMA runner on - /// macOS, single-socket machines, or hwloc failure. - /// Total number of pre-spawned worker slots across all nodes. + /// Total worker slots across all nodes. pub fn max_workers(&self) -> usize { self.nodes.iter().map(|n| n.max_workers).sum() } + /// Detect topology and build. Falls back to a single-node UMA runner on + /// macOS, single-socket machines, or hwloc failure. pub fn new() -> Self { - let n_cores = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - match build() { Some(ns) => { let wpn = ns.workers_per_node(); debug!( "PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node", - ns.pools.len(), - wpn, + ns.pools.len(), wpn, ); let nodes = ns.pools .into_iter() @@ -181,21 +155,20 @@ impl PartitionRunner { max_workers: wpn, }) .collect(); - Self { nodes, n_cores } + Self { nodes } } None => { + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); let max_workers = (n_cores / 2).max(1); - debug!( - "PartitionRunner: UMA mode — adaptive up to {} worker(s)", - max_workers, - ); + debug!("PartitionRunner: UMA mode — {} worker(s)", max_workers); Self { nodes: vec![NodeConfig { pool: None, cpu_ids: vec![], max_workers, }], - n_cores, } } } @@ -203,19 +176,17 @@ impl PartitionRunner { /// Run `f(i)` for every index in `order`. /// - /// `on_done(i, result, elapsed)` is called under an internal mutex as each - /// partition completes — suitable for progress bars, logging, and result - /// aggregation. No `Send` or `Sync` bound is required on the callback. - /// - /// The work queue is shared across all NUMA nodes: any idle worker takes - /// the next available partition regardless of node, ensuring load balance. + /// Workers are spawned upfront and distributed round-robin across NUMA + /// nodes. `on_done(i, result, elapsed)` is called from the controller + /// thread as each partition completes — suitable for progress bars and + /// result aggregation. /// /// Returns the first error produced by `f`, if any. pub fn run( &self, - order: &[usize], - f: F, - on_done: C, + order: &[usize], + f: F, + mut on_done: C, ) -> Result<(), E> where F: Fn(usize) -> Result + Send + Sync, @@ -223,131 +194,56 @@ impl PartitionRunner { E: Send, C: FnMut(usize, R, Duration) + Send, { - let f = Arc::new(f); - let on_done = Arc::new(Mutex::new(on_done)); - let first_err: Arc>> = Arc::new(Mutex::new(None)); - - // Shared work queue — pre-loaded in caller-supplied order. + // Pre-load the work queue, then drop the sender so workers' part_rx + // iterators terminate when the queue is drained. let (part_tx, part_rx) = unbounded::(); - for &i in order { - part_tx.send(i).ok(); - } + for &i in order { part_tx.send(i).ok(); } drop(part_tx); - let n_cores = self.n_cores; + let (result_tx, result_rx) = unbounded::<(usize, Result, Duration)>(); + let n_nodes = self.nodes.len(); + let f = &f; // shared borrow; F: Sync so concurrent calls are safe + + let mut first_err: Option = None; std::thread::scope(|s| { - for node in &self.nodes { - let f = Arc::clone(&f); - let on_done = Arc::clone(&on_done); - let first_err = Arc::clone(&first_err); - let part_rx = part_rx.clone(); + // Spawn all workers upfront, round-robin across NUMA nodes. + for w in 0..self.max_workers() { + let node = &self.nodes[w % n_nodes]; + let prx = part_rx.clone(); + let rtx = result_tx.clone(); + let pool = node.pool.clone(); + let cpu_ids = &node.cpu_ids; s.spawn(move || { - // Per-node result and activation channels. - let (result_tx, result_rx) = - unbounded::<(usize, Result, Duration)>(); - let (activate_tx, activate_rx) = unbounded::<()>(); - - std::thread::scope(|ws| { - // Pre-spawn workers (all dormant until activated). - for _ in 0..node.max_workers { - let prx = part_rx.clone(); - let rtx = result_tx.clone(); - let arx = activate_rx.clone(); - let f = Arc::clone(&f); - let pool = node.pool.clone(); - let cpu_ids = node.cpu_ids.clone(); - - ws.spawn(move || { - if !cpu_ids.is_empty() { - pin_current_thread(&cpu_ids); - } - if arx.recv().is_err() { - return; // never activated — exit cleanly - } - for i in &prx { - let t = Instant::now(); - let r = match &pool { - Some(p) => p.install(|| f(i)), - None => f(i), - }; - rtx.send((i, r, t.elapsed())).ok(); - } - }); - } - // Drop the controller's copy: result_rx disconnects - // once all worker copies are also dropped (workers done). - drop(result_tx); - - // In NUMA mode activate all workers immediately; - // in UMA mode activate one and grow adaptively. - let numa_mode = node.pool.is_some(); - let initial = if numa_mode { node.max_workers } else { 1 }; - for _ in 0..initial { - activate_tx.send(()).ok(); - } - let mut active_workers = initial; - let mut cpu_sample = CpuSample::now(); - let mut eff_at_last_spawn = 0.0f64; - - // Controller loop. - loop { - match result_rx.recv_timeout(SPAWN_POLL) { - Ok((i, r, dur)) => { - match r { - Ok(v) => { - on_done.lock().unwrap()(i, v, dur); - } - Err(e) => { - let mut g = first_err.lock().unwrap(); - if g.is_none() { *g = Some(e); } - } - } - if !numa_mode && active_workers < node.max_workers { - let eff = cpu_sample.cpu_efficiency(n_cores); - if should_spawn_worker(active_workers, eff, eff_at_last_spawn) { - debug!( - "activated worker {} — efficiency {:.0}%", - active_workers + 1, - eff * 100.0, - ); - activate_tx.send(()).ok(); - active_workers += 1; - eff_at_last_spawn = eff; - cpu_sample = CpuSample::now(); - } - } - } - Err(RecvTimeoutError::Timeout) => { - if !numa_mode && active_workers < node.max_workers { - let eff = cpu_sample.cpu_efficiency(n_cores); - if should_spawn_worker(active_workers, eff, eff_at_last_spawn) { - debug!( - "activated worker {} (poll) — efficiency {:.0}%", - active_workers + 1, - eff * 100.0, - ); - activate_tx.send(()).ok(); - active_workers += 1; - eff_at_last_spawn = eff; - cpu_sample = CpuSample::now(); - } - } - } - Err(RecvTimeoutError::Disconnected) => break, - } - } - // Signal any dormant workers that were never activated - // to exit (UMA mode where max_workers was never reached). - drop(activate_tx); - }); // ws: waits for all workers of this node + if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); } + for i in &prx { + let t = Instant::now(); + let r = match &pool { + Some(p) => p.install(|| f(i)), + None => f(i), + }; + rtx.send((i, r, t.elapsed())).ok(); + } }); } - }); // s: waits for all node controllers - let mut g = first_err.lock().unwrap(); - match g.take() { + // Drop the controller's sender: result_rx closes once all worker + // rtx clones are dropped (i.e. all workers have exited). + drop(result_tx); + + // Drain results concurrently with workers. The for loop exits + // when result_rx is disconnected — at that point all workers are + // done and the scope join below is instantaneous. + for (i, r, dur) in &result_rx { + match r { + Ok(v) => on_done(i, v, dur), + Err(e) => { if first_err.is_none() { first_err = Some(e); } } + } + } + }); + + match first_err { Some(e) => Err(e), None => Ok(()), } diff --git a/src/obikindex/src/rebuild.rs b/src/obikindex/src/rebuild.rs index 6948209..b1a8b5c 100644 --- a/src/obikindex/src/rebuild.rs +++ b/src/obikindex/src/rebuild.rs @@ -4,7 +4,6 @@ use std::path::Path; use obikpartitionner::{KmerFilter, KmerPartition, MergeMode}; use obisys::{Reporter, Stage, progress_bar}; -use rayon::prelude::*; use tracing::info; use crate::error::{OKIError, OKIResult}; @@ -83,23 +82,16 @@ impl KmerIndex { let src_partition = &src.partition; let block_bits = meta.config.block_bits; - let errors: Vec = (0..n_partitions) - .into_par_iter() - .filter_map(|i| { - let result = dst_partition - .rebuild_partition(src_partition, i, filters, mode, n_genomes, block_bits) - .err(); - pb.inc(1); - result - }) - .collect(); + let order: Vec = (0..n_partitions).collect(); + let runner = crate::numa::PartitionRunner::new(); + runner.run( + &order, + |i| dst_partition.rebuild_partition(src_partition, i, filters, mode, n_genomes, block_bits), + |_, _, _| { pb.inc(1); }, + ).map_err(OKIError::Partition)?; pb.finish_and_clear(); - if let Some(e) = errors.into_iter().next() { - return Err(OKIError::Partition(e)); - } - rep.push(t.stop()); // Write SENTINEL_INDEXED — output is ready to use. diff --git a/src/obikindex/src/reindex.rs b/src/obikindex/src/reindex.rs index 878d51a..db724b9 100644 --- a/src/obikindex/src/reindex.rs +++ b/src/obikindex/src/reindex.rs @@ -3,7 +3,6 @@ use std::path::Path; use obilayeredmap::{IndexMode, layer::Layer}; use obilayeredmap::meta::PartitionMeta; use obisys::{Reporter, Stage, progress_bar}; -use rayon::prelude::*; use tracing::info; use crate::error::{OKIError, OKIResult}; @@ -45,25 +44,17 @@ impl KmerIndex { let t = Stage::start("reindex"); let pb = progress_bar("reindex", n as u64, "partitions"); - let errors: Vec = (0..n) - .into_par_iter() - .filter_map(|i| { - let res = reindex_partition( - &self.partition.part_dir(i).join("index"), - &target, - block_bits, - ); - pb.inc(1); - res.err().map(|e| format!("partition {i}: {e}")) - }) - .collect(); + let order: Vec = (0..n).collect(); + let runner = crate::numa::PartitionRunner::new(); + runner.run( + &order, + |i| reindex_partition(&self.partition.part_dir(i).join("index"), &target, block_bits) + .map_err(|e| OKIError::InvalidInput(format!("partition {i}: {e}"))), + |_, _, _| { pb.inc(1); }, + )?; pb.finish_and_clear(); - if let Some(e) = errors.into_iter().next() { - return Err(OKIError::InvalidInput(e)); - } - self.meta.config.evidence = target; if matches!(self.meta.config.evidence, IndexMode::Exact) { self.meta.config.block_bits = block_bits; diff --git a/src/obikindex/src/select.rs b/src/obikindex/src/select.rs index 653c8ef..1db57bd 100644 --- a/src/obikindex/src/select.rs +++ b/src/obikindex/src/select.rs @@ -4,7 +4,6 @@ use std::path::Path; use obikpartitionner::{KmerPartition, OutputCol, PARTITIONS_SUBDIR}; use obisys::{Stage, progress_bar}; -use rayon::prelude::*; use tracing::info; use crate::error::{OKIError, OKIResult}; @@ -72,25 +71,16 @@ impl KmerIndex { let pb = progress_bar("select", n_partitions as u64, "partitions"); let src_partition = &src.partition; - let errors: Vec = (0..n_partitions) - .into_par_iter() - .filter_map(|i| { - let result = dst_partition.select_partition( - src_partition, i, specs, - n_src_genomes, threshold, output_presence, - false, - ); - pb.inc(1); - result.err() - }) - .collect(); + let order: Vec = (0..n_partitions).collect(); + let runner = crate::numa::PartitionRunner::new(); + runner.run( + &order, + |i| dst_partition.select_partition(src_partition, i, specs, n_src_genomes, threshold, output_presence, false), + |_, _, _| { pb.inc(1); }, + ).map_err(OKIError::Partition)?; pb.finish_and_clear(); - if let Some(e) = errors.into_iter().next() { - return Err(OKIError::Partition(e)); - } - let _ = t.stop(); fs::File::create(output.join(SENTINEL_INDEXED))?; @@ -132,25 +122,17 @@ impl KmerIndex { let t = Stage::start("select"); let pb = progress_bar("select", n_partitions as u64, "partitions"); - let errors: Vec = (0..n_partitions) - .into_par_iter() - .filter_map(|i| { - let result = self.partition.select_partition( - &src_partition, i, specs, - n_src_genomes, threshold, output_presence, - true, - ); - pb.inc(1); - result.err() - }) - .collect(); + let partition = &self.partition; + let order: Vec = (0..n_partitions).collect(); + let runner = crate::numa::PartitionRunner::new(); + runner.run( + &order, + |i| partition.select_partition(&src_partition, i, specs, n_src_genomes, threshold, output_presence, true), + |_, _, _| { pb.inc(1); }, + ).map_err(OKIError::Partition)?; pb.finish_and_clear(); - if let Some(e) = errors.into_iter().next() { - return Err(OKIError::Partition(e)); - } - let _ = t.stop(); // Update index.meta with new genome list and with_counts flag.