diff --git a/src/obicompactvec/src/bitmatrix.rs b/src/obicompactvec/src/bitmatrix.rs index 631db63..ca1b393 100644 --- a/src/obicompactvec/src/bitmatrix.rs +++ b/src/obicompactvec/src/bitmatrix.rs @@ -1,5 +1,5 @@ use std::fs::{self, File}; -use std::io::{self, Write as _}; +use std::io::{self, BufWriter, Write as _}; use std::path::{Path, PathBuf}; use memmap2::Mmap; @@ -230,30 +230,47 @@ impl PackedBitMatrix { /// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files. pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> { + let packed_path = dir.join("matrix.pbmx"); + if packed_path.exists() { + // Matrix complete; remove any leftover column files from a killed cleanup. + if let Ok(meta) = MatrixMeta::load(dir) { + for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); } + let _ = fs::remove_file(dir.join("meta.json")); + } + return Ok(()); + } + let meta = MatrixMeta::load(dir)?; let n_cols = meta.n_cols; - let col_files: Vec> = (0..n_cols) - .map(|c| fs::read(col_path(dir, c))) + // Compute offsets from file sizes — no column data loaded into RAM. + let col_sizes: Vec = (0..n_cols) + .map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len())) .collect::>()?; - let header_size = PBMX_HEADER + n_cols * 8; + let header_size = (PBMX_HEADER + n_cols * 8) as u64; let mut col_offset = header_size; let mut offsets = Vec::with_capacity(n_cols); - for data in &col_files { - offsets.push(col_offset as u64); - col_offset += data.len(); + for &size in &col_sizes { + offsets.push(col_offset); + col_offset += size; } - let packed_path = dir.join("matrix.pbmx"); - let mut file = File::create(&packed_path)?; - file.write_all(&PBMX_MAGIC)?; - file.write_all(&[0u8; 4])?; - file.write_all(&(meta.n as u64).to_le_bytes())?; - file.write_all(&(n_cols as u64).to_le_bytes())?; - for &off in &offsets { file.write_all(&off.to_le_bytes())?; } - for data in &col_files { file.write_all(data)?; } - drop(file); + // Write to a temp file; rename atomically so a killed process never leaves + // a truncated matrix.pbmx that would be mistaken for a complete file. + let tmp_path = dir.join("matrix.pbmx.tmp"); + let mut out = BufWriter::new(File::create(&tmp_path)?); + out.write_all(&PBMX_MAGIC)?; + out.write_all(&[0u8; 4])?; + out.write_all(&(meta.n as u64).to_le_bytes())?; + out.write_all(&(n_cols as u64).to_le_bytes())?; + for &off in &offsets { out.write_all(&off.to_le_bytes())?; } + for c in 0..n_cols { + io::copy(&mut File::open(col_path(dir, c))?, &mut out)?; + } + out.flush()?; + drop(out); + fs::rename(&tmp_path, &packed_path)?; for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; } fs::remove_file(dir.join("meta.json"))?; diff --git a/src/obicompactvec/src/intmatrix.rs b/src/obicompactvec/src/intmatrix.rs index 8db78da..b563335 100644 --- a/src/obicompactvec/src/intmatrix.rs +++ b/src/obicompactvec/src/intmatrix.rs @@ -1,6 +1,6 @@ use std::cmp::Ordering; use std::fs::{self, File}; -use std::io::{self, Write as _}; +use std::io::{self, BufWriter, Write as _}; use std::path::{Path, PathBuf}; use memmap2::Mmap; @@ -354,30 +354,47 @@ impl PackedCompactIntMatrix { /// Build `counts/matrix.pcmx` from existing `col_*.pciv` files. pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> { + let packed_path = dir.join("matrix.pcmx"); + if packed_path.exists() { + // Matrix complete; remove any leftover column files from a killed cleanup. + if let Ok(meta) = MatrixMeta::load(dir) { + for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); } + let _ = fs::remove_file(dir.join("meta.json")); + } + return Ok(()); + } + let meta = MatrixMeta::load(dir)?; let n_cols = meta.n_cols; - let col_files: Vec> = (0..n_cols) - .map(|c| fs::read(col_path(dir, c))) + // Compute offsets from file sizes — no column data loaded into RAM. + let col_sizes: Vec = (0..n_cols) + .map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len())) .collect::>()?; - let header_size = PCMX_HEADER + n_cols * 8; + let header_size = (PCMX_HEADER + n_cols * 8) as u64; let mut col_offset = header_size; let mut offsets = Vec::with_capacity(n_cols); - for data in &col_files { - offsets.push(col_offset as u64); - col_offset += data.len(); + for &size in &col_sizes { + offsets.push(col_offset); + col_offset += size; } - let packed_path = dir.join("matrix.pcmx"); - let mut file = File::create(&packed_path)?; - file.write_all(&PCMX_MAGIC)?; - file.write_all(&[0u8; 4])?; - file.write_all(&(meta.n as u64).to_le_bytes())?; - file.write_all(&(n_cols as u64).to_le_bytes())?; - for &off in &offsets { file.write_all(&off.to_le_bytes())?; } - for data in &col_files { file.write_all(data)?; } - drop(file); + // Write to a temp file; rename atomically so a killed process never leaves + // a truncated matrix.pcmx that would be mistaken for a complete file. + let tmp_path = dir.join("matrix.pcmx.tmp"); + let mut out = BufWriter::new(File::create(&tmp_path)?); + out.write_all(&PCMX_MAGIC)?; + out.write_all(&[0u8; 4])?; + out.write_all(&(meta.n as u64).to_le_bytes())?; + out.write_all(&(n_cols as u64).to_le_bytes())?; + for &off in &offsets { out.write_all(&off.to_le_bytes())?; } + for c in 0..n_cols { + io::copy(&mut File::open(col_path(dir, c))?, &mut out)?; + } + out.flush()?; + drop(out); + fs::rename(&tmp_path, &packed_path)?; for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; } fs::remove_file(dir.join("meta.json"))?; diff --git a/src/obikindex/src/index.rs b/src/obikindex/src/index.rs index 2c58aed..353c39a 100644 --- a/src/obikindex/src/index.rs +++ b/src/obikindex/src/index.rs @@ -1,8 +1,6 @@ use std::collections::BTreeMap; use std::fs; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; use obikpartitionner::{KmerPartition, KmerSpectrum}; use obilayeredmap; @@ -152,31 +150,25 @@ impl KmerIndex { let with_counts = self.meta.config.with_counts; let evidence = self.meta.config.evidence.clone(); let block_bits = self.meta.config.block_bits; - let total_kmers = AtomicUsize::new(0); + let mut total_kmers: usize = 0; + let pb = progress_bar("index", n as u64, "partitions"); - let pb = Arc::new(Mutex::new(progress_bar("index", n as u64, "partitions"))); - - (0..n).into_par_iter().for_each(|i| { - match self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits) { - Ok(0) => {} - Ok(n_kmers) => { - total_kmers.fetch_add(n_kmers, Ordering::Relaxed); - let pb = pb.lock().unwrap(); + let order: Vec = (0..n).collect(); + let runner = crate::numa::PartitionRunner::new(); + runner.run( + &order, + |i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits), + |i, n_kmers, _| { + if n_kmers > 0 { + total_kmers += n_kmers; pb.inc(1); pb.set_message(format!("{i}: {n_kmers} kmers")); } - Err(e) => { - eprintln!("error building layer for partition {i}: {e}"); - std::process::exit(1); - } - } - }); + }, + ).map_err(OKIError::Partition)?; - pb.lock().unwrap().finish_and_clear(); - info!( - "done — {} total kmers indexed", - total_kmers.load(Ordering::Relaxed) - ); + pb.finish_and_clear(); + info!("done — {} total kmers indexed", total_kmers); if !keep_intermediate { for i in 0..n { @@ -211,36 +203,25 @@ impl KmerIndex { use obilayeredmap::meta::PartitionMeta; let n = self.n_partitions(); - let errors: Vec<_> = (0..n) - .into_par_iter() - .filter_map(|i| { + let order: Vec = (0..n).collect(); + crate::numa::PartitionRunner::new().run( + &order, + |i| -> OKIResult<()> { let index_dir = self.partition.part_dir(i).join("index"); - if !index_dir.exists() { return None; } - let meta = match PartitionMeta::load(&index_dir) { - Ok(m) => m, - Err(e) => return Some(OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))), - }; + if !index_dir.exists() { return Ok(()); } + let meta = PartitionMeta::load(&index_dir) + .map_err(|e| OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())))?; for l in 0..meta.n_layers { let layer_dir = index_dir.join(format!("layer_{l}")); let presence_dir = layer_dir.join("presence"); let counts_dir = layer_dir.join("counts"); - if presence_dir.exists() { - if let Err(e) = pack_bit_matrix(&presence_dir) { - return Some(OKIError::Io(e)); - } - } - if counts_dir.exists() { - if let Err(e) = pack_compact_int_matrix(&counts_dir) { - return Some(OKIError::Io(e)); - } - } + if presence_dir.exists() { pack_bit_matrix(&presence_dir).map_err(OKIError::Io)?; } + if counts_dir.exists() { pack_compact_int_matrix(&counts_dir).map_err(OKIError::Io)?; } } - None - }) - .collect(); - - if let Some(e) = errors.into_iter().next() { return Err(e); } - Ok(()) + Ok(()) + }, + |_, _, _| {}, + ) } /// Write a `layer_meta.json` in any layer directory that is missing one. diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index dde62b7..4c12013 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -1,4 +1,4 @@ -// NUMA-aware Rayon thread pools via hwlocality. +// NUMA-aware partition runner via hwlocality. // // Detects NUMA topology using hwloc (cross-platform: Linux, macOS, etc.) and // builds one Rayon ThreadPool per NUMA node with threads pinned to that node's @@ -10,15 +10,14 @@ // - the system has only one NUMA node (UMA, Apple Silicon, single-socket) // - any per-node pool fails to build -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::{Duration, Instant}; -use crossbeam_channel::{RecvTimeoutError, unbounded}; +use crossbeam_channel::unbounded; use hwlocality::Topology; use hwlocality::cpu::binding::CpuBindingFlags; use hwlocality::cpu::cpuset::CpuSet; use hwlocality::object::types::ObjectType; -use obisys::CpuSample; use tracing::debug; // ── Public interface ────────────────────────────────────────────────────────── @@ -104,27 +103,6 @@ fn build_pool(cpus: &[usize]) -> Option { .ok() } -// ── Adaptive spawn heuristic ────────────────────────────────────────────────── -// -// First worker: spawn if CPU efficiency is below SPAWN_THRESHOLD (machine is -// under-utilised). Subsequent workers: spawn only if the last worker raised -// efficiency by at least the expected marginal gain (1/n_workers), with a -// minimum floor to avoid spurious spawns from efficiency fluctuations. - -const SPAWN_THRESHOLD: f64 = 0.95; -const MIN_MARGINAL_GAIN: f64 = 0.03; -const SPAWN_POLL: Duration = Duration::from_secs(20); - -fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool { - if n_workers == 1 { - eff < SPAWN_THRESHOLD - } else { - let gain = eff - eff_at_last_spawn; - let expected = 1.0 / n_workers as f64; - gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN) - } -} - // ── PartitionRunner ─────────────────────────────────────────────────────────── struct NodeConfig { @@ -135,42 +113,38 @@ struct NodeConfig { /// Generic NUMA-aware runner for partition-level parallel work. /// -/// Encapsulates worker spawning, NUMA pinning, adaptive activation, and result -/// collection. UMA systems are handled as the degenerate case of a single node -/// with no pinning. +/// Workers are distributed round-robin across NUMA nodes and pinned to their +/// node's CPUs. UMA systems are the degenerate case: one node, no pinning. /// -/// # Model +/// # Termination /// -/// One controller thread per NUMA node (one total on UMA). Each controller -/// manages up to `max_workers` dormant workers that drain a shared work queue. -/// Workers are activated one at a time; a new worker is added when global CPU -/// efficiency justifies it. On NUMA all workers are activated immediately -/// (memory bandwidth, not CPU count, is the bottleneck). +/// Termination is driven entirely by channel closure: +/// +/// ```text +/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx +/// drop(result_tx) → result_rx closes → controller loop exits +/// ``` +/// +/// No explicit counter or sentinel needed. pub struct PartitionRunner { - nodes: Vec, - n_cores: usize, + nodes: Vec, } impl PartitionRunner { - /// Detect topology and build. Falls back to a single-node UMA runner on - /// macOS, single-socket machines, or hwloc failure. - /// Total number of pre-spawned worker slots across all nodes. + /// Total worker slots across all nodes. pub fn max_workers(&self) -> usize { self.nodes.iter().map(|n| n.max_workers).sum() } + /// Detect topology and build. Falls back to a single-node UMA runner on + /// macOS, single-socket machines, or hwloc failure. pub fn new() -> Self { - let n_cores = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - match build() { Some(ns) => { let wpn = ns.workers_per_node(); debug!( "PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node", - ns.pools.len(), - wpn, + ns.pools.len(), wpn, ); let nodes = ns.pools .into_iter() @@ -181,21 +155,20 @@ impl PartitionRunner { max_workers: wpn, }) .collect(); - Self { nodes, n_cores } + Self { nodes } } None => { + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); let max_workers = (n_cores / 2).max(1); - debug!( - "PartitionRunner: UMA mode — adaptive up to {} worker(s)", - max_workers, - ); + debug!("PartitionRunner: UMA mode — {} worker(s)", max_workers); Self { nodes: vec![NodeConfig { pool: None, cpu_ids: vec![], max_workers, }], - n_cores, } } } @@ -203,19 +176,17 @@ impl PartitionRunner { /// Run `f(i)` for every index in `order`. /// - /// `on_done(i, result, elapsed)` is called under an internal mutex as each - /// partition completes — suitable for progress bars, logging, and result - /// aggregation. No `Send` or `Sync` bound is required on the callback. - /// - /// The work queue is shared across all NUMA nodes: any idle worker takes - /// the next available partition regardless of node, ensuring load balance. + /// Workers are spawned upfront and distributed round-robin across NUMA + /// nodes. `on_done(i, result, elapsed)` is called from the controller + /// thread as each partition completes — suitable for progress bars and + /// result aggregation. /// /// Returns the first error produced by `f`, if any. pub fn run( &self, - order: &[usize], - f: F, - on_done: C, + order: &[usize], + f: F, + mut on_done: C, ) -> Result<(), E> where F: Fn(usize) -> Result + Send + Sync, @@ -223,131 +194,56 @@ impl PartitionRunner { E: Send, C: FnMut(usize, R, Duration) + Send, { - let f = Arc::new(f); - let on_done = Arc::new(Mutex::new(on_done)); - let first_err: Arc>> = Arc::new(Mutex::new(None)); - - // Shared work queue — pre-loaded in caller-supplied order. + // Pre-load the work queue, then drop the sender so workers' part_rx + // iterators terminate when the queue is drained. let (part_tx, part_rx) = unbounded::(); - for &i in order { - part_tx.send(i).ok(); - } + for &i in order { part_tx.send(i).ok(); } drop(part_tx); - let n_cores = self.n_cores; + let (result_tx, result_rx) = unbounded::<(usize, Result, Duration)>(); + let n_nodes = self.nodes.len(); + let f = &f; // shared borrow; F: Sync so concurrent calls are safe + + let mut first_err: Option = None; std::thread::scope(|s| { - for node in &self.nodes { - let f = Arc::clone(&f); - let on_done = Arc::clone(&on_done); - let first_err = Arc::clone(&first_err); - let part_rx = part_rx.clone(); + // Spawn all workers upfront, round-robin across NUMA nodes. + for w in 0..self.max_workers() { + let node = &self.nodes[w % n_nodes]; + let prx = part_rx.clone(); + let rtx = result_tx.clone(); + let pool = node.pool.clone(); + let cpu_ids = &node.cpu_ids; s.spawn(move || { - // Per-node result and activation channels. - let (result_tx, result_rx) = - unbounded::<(usize, Result, Duration)>(); - let (activate_tx, activate_rx) = unbounded::<()>(); - - std::thread::scope(|ws| { - // Pre-spawn workers (all dormant until activated). - for _ in 0..node.max_workers { - let prx = part_rx.clone(); - let rtx = result_tx.clone(); - let arx = activate_rx.clone(); - let f = Arc::clone(&f); - let pool = node.pool.clone(); - let cpu_ids = node.cpu_ids.clone(); - - ws.spawn(move || { - if !cpu_ids.is_empty() { - pin_current_thread(&cpu_ids); - } - if arx.recv().is_err() { - return; // never activated — exit cleanly - } - for i in &prx { - let t = Instant::now(); - let r = match &pool { - Some(p) => p.install(|| f(i)), - None => f(i), - }; - rtx.send((i, r, t.elapsed())).ok(); - } - }); - } - // Drop the controller's copy: result_rx disconnects - // once all worker copies are also dropped (workers done). - drop(result_tx); - - // In NUMA mode activate all workers immediately; - // in UMA mode activate one and grow adaptively. - let numa_mode = node.pool.is_some(); - let initial = if numa_mode { node.max_workers } else { 1 }; - for _ in 0..initial { - activate_tx.send(()).ok(); - } - let mut active_workers = initial; - let mut cpu_sample = CpuSample::now(); - let mut eff_at_last_spawn = 0.0f64; - - // Controller loop. - loop { - match result_rx.recv_timeout(SPAWN_POLL) { - Ok((i, r, dur)) => { - match r { - Ok(v) => { - on_done.lock().unwrap()(i, v, dur); - } - Err(e) => { - let mut g = first_err.lock().unwrap(); - if g.is_none() { *g = Some(e); } - } - } - if !numa_mode && active_workers < node.max_workers { - let eff = cpu_sample.cpu_efficiency(n_cores); - if should_spawn_worker(active_workers, eff, eff_at_last_spawn) { - debug!( - "activated worker {} — efficiency {:.0}%", - active_workers + 1, - eff * 100.0, - ); - activate_tx.send(()).ok(); - active_workers += 1; - eff_at_last_spawn = eff; - cpu_sample = CpuSample::now(); - } - } - } - Err(RecvTimeoutError::Timeout) => { - if !numa_mode && active_workers < node.max_workers { - let eff = cpu_sample.cpu_efficiency(n_cores); - if should_spawn_worker(active_workers, eff, eff_at_last_spawn) { - debug!( - "activated worker {} (poll) — efficiency {:.0}%", - active_workers + 1, - eff * 100.0, - ); - activate_tx.send(()).ok(); - active_workers += 1; - eff_at_last_spawn = eff; - cpu_sample = CpuSample::now(); - } - } - } - Err(RecvTimeoutError::Disconnected) => break, - } - } - // Signal any dormant workers that were never activated - // to exit (UMA mode where max_workers was never reached). - drop(activate_tx); - }); // ws: waits for all workers of this node + if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); } + for i in &prx { + let t = Instant::now(); + let r = match &pool { + Some(p) => p.install(|| f(i)), + None => f(i), + }; + rtx.send((i, r, t.elapsed())).ok(); + } }); } - }); // s: waits for all node controllers - let mut g = first_err.lock().unwrap(); - match g.take() { + // Drop the controller's sender: result_rx closes once all worker + // rtx clones are dropped (i.e. all workers have exited). + drop(result_tx); + + // Drain results concurrently with workers. The for loop exits + // when result_rx is disconnected — at that point all workers are + // done and the scope join below is instantaneous. + for (i, r, dur) in &result_rx { + match r { + Ok(v) => on_done(i, v, dur), + Err(e) => { if first_err.is_none() { first_err = Some(e); } } + } + } + }); + + match first_err { Some(e) => Err(e), None => Ok(()), } diff --git a/src/obikindex/src/rebuild.rs b/src/obikindex/src/rebuild.rs index 6948209..b1a8b5c 100644 --- a/src/obikindex/src/rebuild.rs +++ b/src/obikindex/src/rebuild.rs @@ -4,7 +4,6 @@ use std::path::Path; use obikpartitionner::{KmerFilter, KmerPartition, MergeMode}; use obisys::{Reporter, Stage, progress_bar}; -use rayon::prelude::*; use tracing::info; use crate::error::{OKIError, OKIResult}; @@ -83,23 +82,16 @@ impl KmerIndex { let src_partition = &src.partition; let block_bits = meta.config.block_bits; - let errors: Vec = (0..n_partitions) - .into_par_iter() - .filter_map(|i| { - let result = dst_partition - .rebuild_partition(src_partition, i, filters, mode, n_genomes, block_bits) - .err(); - pb.inc(1); - result - }) - .collect(); + let order: Vec = (0..n_partitions).collect(); + let runner = crate::numa::PartitionRunner::new(); + runner.run( + &order, + |i| dst_partition.rebuild_partition(src_partition, i, filters, mode, n_genomes, block_bits), + |_, _, _| { pb.inc(1); }, + ).map_err(OKIError::Partition)?; pb.finish_and_clear(); - if let Some(e) = errors.into_iter().next() { - return Err(OKIError::Partition(e)); - } - rep.push(t.stop()); // Write SENTINEL_INDEXED — output is ready to use. diff --git a/src/obikindex/src/reindex.rs b/src/obikindex/src/reindex.rs index 878d51a..db724b9 100644 --- a/src/obikindex/src/reindex.rs +++ b/src/obikindex/src/reindex.rs @@ -3,7 +3,6 @@ use std::path::Path; use obilayeredmap::{IndexMode, layer::Layer}; use obilayeredmap::meta::PartitionMeta; use obisys::{Reporter, Stage, progress_bar}; -use rayon::prelude::*; use tracing::info; use crate::error::{OKIError, OKIResult}; @@ -45,25 +44,17 @@ impl KmerIndex { let t = Stage::start("reindex"); let pb = progress_bar("reindex", n as u64, "partitions"); - let errors: Vec = (0..n) - .into_par_iter() - .filter_map(|i| { - let res = reindex_partition( - &self.partition.part_dir(i).join("index"), - &target, - block_bits, - ); - pb.inc(1); - res.err().map(|e| format!("partition {i}: {e}")) - }) - .collect(); + let order: Vec = (0..n).collect(); + let runner = crate::numa::PartitionRunner::new(); + runner.run( + &order, + |i| reindex_partition(&self.partition.part_dir(i).join("index"), &target, block_bits) + .map_err(|e| OKIError::InvalidInput(format!("partition {i}: {e}"))), + |_, _, _| { pb.inc(1); }, + )?; pb.finish_and_clear(); - if let Some(e) = errors.into_iter().next() { - return Err(OKIError::InvalidInput(e)); - } - self.meta.config.evidence = target; if matches!(self.meta.config.evidence, IndexMode::Exact) { self.meta.config.block_bits = block_bits; diff --git a/src/obikindex/src/select.rs b/src/obikindex/src/select.rs index 653c8ef..1db57bd 100644 --- a/src/obikindex/src/select.rs +++ b/src/obikindex/src/select.rs @@ -4,7 +4,6 @@ use std::path::Path; use obikpartitionner::{KmerPartition, OutputCol, PARTITIONS_SUBDIR}; use obisys::{Stage, progress_bar}; -use rayon::prelude::*; use tracing::info; use crate::error::{OKIError, OKIResult}; @@ -72,25 +71,16 @@ impl KmerIndex { let pb = progress_bar("select", n_partitions as u64, "partitions"); let src_partition = &src.partition; - let errors: Vec = (0..n_partitions) - .into_par_iter() - .filter_map(|i| { - let result = dst_partition.select_partition( - src_partition, i, specs, - n_src_genomes, threshold, output_presence, - false, - ); - pb.inc(1); - result.err() - }) - .collect(); + let order: Vec = (0..n_partitions).collect(); + let runner = crate::numa::PartitionRunner::new(); + runner.run( + &order, + |i| dst_partition.select_partition(src_partition, i, specs, n_src_genomes, threshold, output_presence, false), + |_, _, _| { pb.inc(1); }, + ).map_err(OKIError::Partition)?; pb.finish_and_clear(); - if let Some(e) = errors.into_iter().next() { - return Err(OKIError::Partition(e)); - } - let _ = t.stop(); fs::File::create(output.join(SENTINEL_INDEXED))?; @@ -132,25 +122,17 @@ impl KmerIndex { let t = Stage::start("select"); let pb = progress_bar("select", n_partitions as u64, "partitions"); - let errors: Vec = (0..n_partitions) - .into_par_iter() - .filter_map(|i| { - let result = self.partition.select_partition( - &src_partition, i, specs, - n_src_genomes, threshold, output_presence, - true, - ); - pb.inc(1); - result.err() - }) - .collect(); + let partition = &self.partition; + let order: Vec = (0..n_partitions).collect(); + let runner = crate::numa::PartitionRunner::new(); + runner.run( + &order, + |i| partition.select_partition(&src_partition, i, specs, n_src_genomes, threshold, output_presence, true), + |_, _, _| { pb.inc(1); }, + ).map_err(OKIError::Partition)?; pb.finish_and_clear(); - if let Some(e) = errors.into_iter().next() { - return Err(OKIError::Partition(e)); - } - let _ = t.stop(); // Update index.meta with new genome list and with_counts flag. diff --git a/src/obikpartitionner/src/common.rs b/src/obikpartitionner/src/common.rs new file mode 100644 index 0000000..99e345e --- /dev/null +++ b/src/obikpartitionner/src/common.rs @@ -0,0 +1,84 @@ +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +use obicompactvec::{PersistentBitVecBuilder, PersistentCompactIntVecBuilder}; +use obilayeredmap::meta::PartitionMeta; +use obilayeredmap::{IndexMode, OLMError}; +use obiskio::{SKError, SKResult}; + +// ── olm_to_sk ──────────────────────────────────────────────────────────────── + +pub(crate) fn olm_to_sk(e: OLMError, context: &'static str) -> SKError { + match e { + OLMError::Io(e) => SKError::Io(e), + other => SKError::InvalidData { + context, + detail: other.to_string(), + }, + } +} + +// ── load_meta ──────────────────────────────────────────────────────────────── + +/// Load PartitionMeta, or recover it by probing layer directories. +/// Indexes built before meta.json was introduced lack the file. +pub(crate) fn load_meta(dir: &Path, context: &'static str) -> SKResult { + match PartitionMeta::load(dir) { + Ok(m) => Ok(m), + Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => + { + let mut n = 0usize; + while dir.join(format!("layer_{n}")).exists() { + n += 1; + } + let m = PartitionMeta { + n_layers: n, + mode: IndexMode::default(), + }; + m.save(dir).map_err(|e| olm_to_sk(e, context))?; + Ok(m) + } + Err(e) => Err(olm_to_sk(e, context)), + } +} + +// ── path helpers ────────────────────────────────────────────────────────────── + +pub(crate) fn col_path_bit(dir: &Path, col: usize) -> PathBuf { + dir.join(format!("col_{col:06}.pbiv")) +} + +pub(crate) fn col_path_int(dir: &Path, col: usize) -> PathBuf { + dir.join(format!("col_{col:06}.pciv")) +} + +pub(crate) fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { + fs::write( + dir.join("meta.json"), + format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), + ) +} + +// ── ColBuilder ──────────────────────────────────────────────────────────────── + +pub(crate) enum ColBuilder { + Bit(PersistentBitVecBuilder), + Int(PersistentCompactIntVecBuilder), +} + +impl ColBuilder { + pub(crate) fn set_val(&mut self, slot: usize, value: u32) { + match self { + ColBuilder::Bit(b) => b.set(slot, value > 0), + ColBuilder::Int(b) => b.set(slot, value), + } + } + + pub(crate) fn close(self) -> SKResult<()> { + match self { + ColBuilder::Bit(b) => b.close().map_err(SKError::Io), + ColBuilder::Int(b) => b.close().map_err(SKError::Io), + } + } +} diff --git a/src/obikpartitionner/src/graph_pipeline.rs b/src/obikpartitionner/src/graph_pipeline.rs new file mode 100644 index 0000000..c1c8302 --- /dev/null +++ b/src/obikpartitionner/src/graph_pipeline.rs @@ -0,0 +1,152 @@ +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use tracing::debug; +use obipipeline::{ + Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool, + make_sink, make_source, make_transform, + throttle, +}; + +use obidebruinj::GraphDeBruijn; +use obikseq::CanonicalKmer; +use obilayeredmap::{IndexMode, Layer}; +use obiskio::{SKError, SKResult}; + +use crate::common::olm_to_sk; + +// ── KmerGraphData ───────────────────────────────────────────────────────────── + +enum KmerGraphData { + File(obipipeline::Throttled), + RawBatch(Vec), + FilteredBatch(Vec), +} + +// ── build_graph ─────────────────────────────────────────────────────────────── + +/// Phase 1: pipeline that reads files, filters kmers, pushes into a GraphDeBruijn. +/// +/// `flat_fn(path, emit)`: opens path, iterates kmers, calls `emit(batch)` for each batch. +/// `filter(kmer) -> bool`: secondary filter applied in the Transform stage. +pub(crate) fn build_graph( + file_source: I, + flat_fn: F, + filter: G, + n_workers: usize, + max_open: usize, +) -> SKResult +where + I: Iterator + Send + 'static, + F: Fn(PathBuf, &mut dyn FnMut(Vec)) -> SKResult<()> + Send + Sync + 'static, + G: Fn(CanonicalKmer) -> bool + Send + Sync + 'static, +{ + let capacity = 2; + + let flat_fn = Arc::new(flat_fn); + let filter = Arc::new(filter); + let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new())); + let g_sink = Arc::clone(&g_shared); + let err_cap: Arc>> = Arc::new(Mutex::new(None)); + let err_flat = Arc::clone(&err_cap); + + let throttled = throttle(file_source, max_open); + + let pipeline = Pipeline::new( + make_source!(KmerGraphData, throttled, File), + vec![ + Stage::Flat(Arc::new( + move |data: KmerGraphData, + push: &PipelineSender>, + delta: &PipelineSender| + { + if let KmerGraphData::File(t) = data { + let path = t.item; + let _guard = t.guard; // released at end of block + let mut count: isize = 0; + let push_clone = push.clone(); + let result = flat_fn(path, &mut |batch: Vec| { + push_clone.send(Ok(KmerGraphData::RawBatch(batch))).ok(); + count += 1; + }); + match result { + Ok(()) => { + delta.send(count - 1).ok(); + } + Err(e) => { + *err_flat.lock().unwrap() = Some(e); + delta.send(-1).ok(); + } + } + } + }, + ) as SharedFlatFn), + make_transform!(KmerGraphData, { + let filter = Arc::clone(&filter); + move |batch: Vec| -> Vec { + batch.into_iter().filter(|k| filter(*k)).collect() + } + }, RawBatch, FilteredBatch), + ], + make_sink!(KmerGraphData, { + move |batch: Vec| { + let mut g = g_sink.lock().unwrap(); + for kmer in batch { + g.push(kmer); + } + } + }, FilteredBatch), + ); + + WorkerPool::new(pipeline, n_workers, capacity).run(); + + if let Some(e) = Arc::try_unwrap(err_cap) + .unwrap_or_else(|_| panic!("build_graph: err_cap not uniquely owned after pipeline")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()) + { + return Err(e); + } + + let g = Arc::try_unwrap(g_shared) + .unwrap_or_else(|_| panic!("build_graph: g_shared not uniquely owned after pipeline")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()); + + Ok(g) +} + +// ── write_graph_as_unitigs ──────────────────────────────────────────────────── + +/// Phase 2 (write unitigs only): compute degrees, write unitigs to `layer_dir`, drop graph. +/// +/// Returns n_kmers. Does NOT build the MPHF — caller does it. +pub(crate) fn write_graph_as_unitigs(g: GraphDeBruijn, layer_dir: &Path) -> SKResult { + let n_kmers = g.len(); + g.compute_degrees_and_mark_starts(); + std::fs::create_dir_all(layer_dir)?; + let mut uw = Layer::<()>::unitig_writer(layer_dir).map_err(|e| olm_to_sk(e, "graph pipeline"))?; + g.try_for_each_unitig(|unitig| uw.write(unitig))?; + uw.close()?; + drop(g); + Ok(n_kmers) +} + +// ── materialize_layer ───────────────────────────────────────────────────────── + +/// Phase 2 (full): write_graph_as_unitigs + `Layer::<()>::build`. +/// +/// Returns n_kmers. +pub(crate) fn materialize_layer( + g: GraphDeBruijn, + layer_dir: &Path, + block_bits: u8, + evidence: &IndexMode, +) -> SKResult { + let n = write_graph_as_unitigs(g, layer_dir)?; + debug!("materialize_layer: unitigs written ({n} kmers), building MPHF"); + Layer::<()>::build(layer_dir, block_bits, evidence) + .map_err(|e| olm_to_sk(e, "graph pipeline"))?; + debug!("materialize_layer: MPHF build done"); + Ok(n) +} diff --git a/src/obikpartitionner/src/index_layer.rs b/src/obikpartitionner/src/index_layer.rs index 8eed6db..6b48df5 100644 --- a/src/obikpartitionner/src/index_layer.rs +++ b/src/obikpartitionner/src/index_layer.rs @@ -6,24 +6,16 @@ use epserde::prelude::*; use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; use obidebruinj::GraphDeBruijn; use obilayeredmap::meta::PartitionMeta; -use obilayeredmap::{IndexMode, OLMError, layer::Layer}; +use obilayeredmap::{IndexMode, layer::Layer}; use obiskio::{SKError, SKFileMeta, SKFileReader}; use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; +use crate::common::olm_to_sk; +use crate::graph_pipeline::{materialize_layer, write_graph_as_unitigs}; use crate::partition::KmerPartition; type Mphf = PtrHash>, Xx64, Vec>; -fn olm_to_sk(e: OLMError) -> SKError { - match e { - OLMError::Io(io_err) => SKError::Io(io_err), - other => SKError::InvalidData { - context: "layer build", - detail: other.to_string(), - }, - } -} - fn remove_if_exists(path: &std::path::Path) { if let Err(e) = fs::remove_file(path) { if e.kind() != io::ErrorKind::NotFound { @@ -101,18 +93,8 @@ impl KmerPartition { } } - let n_kmers = g.len(); - g.compute_degrees_and_mark_starts(); - - fs::create_dir_all(&layer_dir)?; - - let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?; - g.try_for_each_unitig(|unitig| { - uw.write(unitig) - })?; - uw.close()?; - - if with_counts { + let n_kmers = if with_counts { + let n = write_graph_as_unitigs(g, &layer_dir)?; Layer::::build( &layer_dir, block_bits, @@ -122,10 +104,11 @@ impl KmerPartition { _ => 1, }, ) - .map_err(olm_to_sk)?; + .map_err(|e| olm_to_sk(e, "layer build"))?; + n } else { - Layer::<()>::build(&layer_dir, block_bits, mode).map_err(olm_to_sk)?; - } + materialize_layer(g, &layer_dir, block_bits, mode)? + }; let index_dir = layer_dir.parent().expect("layer_dir has a parent"); PartitionMeta { @@ -133,7 +116,7 @@ impl KmerPartition { mode: mode.clone(), } .save(index_dir) - .map_err(olm_to_sk)?; + .map_err(|e| olm_to_sk(e, "layer build"))?; Ok(n_kmers) } diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index fe09264..a63e2c9 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -1,6 +1,8 @@ pub mod filter; +mod common; mod distance; mod dump_layer; +mod graph_pipeline; mod index_layer; mod kmer_sort; mod merge_layer; diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index d56e71a..0701b6d 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -1,5 +1,4 @@ use std::fs; -use std::io; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; @@ -14,12 +13,13 @@ use obicompactvec::{ PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder, }; -use obidebruinj::GraphDeBruijn; use obikseq::CanonicalKmer; use obilayeredmap::meta::PartitionMeta; -use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError}; +use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly}; use obiskio::{SKError, SKResult, UnitigFileReader}; +use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta}; +use crate::graph_pipeline::{build_graph, materialize_layer}; use crate::partition::KmerPartition; // ── MergeMode ───────────────────────────────────────────────────────────────── @@ -30,29 +30,6 @@ pub enum MergeMode { Count, } -// ── ColBuilder — enum dispatch to avoid trait-object boxing issues ───────────── - -enum ColBuilder { - Bit(PersistentBitVecBuilder), - Int(PersistentCompactIntVecBuilder), -} - -impl ColBuilder { - fn set_val(&mut self, slot: usize, value: u32) { - match self { - ColBuilder::Bit(b) => b.set(slot, value > 0), - ColBuilder::Int(b) => b.set(slot, value), - } - } - - fn close(self) -> SKResult<()> { - match self { - ColBuilder::Bit(b) => b.close().map_err(SKError::Io), - ColBuilder::Int(b) => b.close().map_err(SKError::Io), - } - } -} - // ── SrcLayerData — opened source matrix for pass-2 lookup ───────────────────── pub(crate) enum SrcLayerData { @@ -66,18 +43,18 @@ impl SrcLayerData { match merge_mode { MergeMode::Presence => { if counts_dir.exists() && !layer_dir.join("presence").exists() { - let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; + let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?; let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Count(mphf, mat)) } else { // presence dir exists, or neither exists → Implicit handled by open() - let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; + let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?; let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Presence(mphf, mat)) } } MergeMode::Count => { - let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; + let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?; if counts_dir.exists() { let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Count(mphf, mat)) @@ -107,53 +84,6 @@ impl SrcLayerData { const INDEX_SUBDIR: &str = "index"; -/// Load PartitionMeta, or recover it by probing layer directories. -/// Indexes built before meta.json was introduced lack the file. -fn load_meta(dir: &Path) -> SKResult { - match PartitionMeta::load(dir) { - Ok(m) => Ok(m), - Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => - { - let mut n = 0usize; - while dir.join(format!("layer_{n}")).exists() { - n += 1; - } - let m = PartitionMeta { - n_layers: n, - mode: IndexMode::default(), - }; - m.save(dir).map_err(olm_to_sk)?; - Ok(m) - } - Err(e) => Err(olm_to_sk(e)), - } -} - -fn olm_to_sk(e: OLMError) -> SKError { - match e { - OLMError::Io(e) => SKError::Io(e), - other => SKError::InvalidData { - context: "merge", - detail: other.to_string(), - }, - } -} - -fn col_path_bit(dir: &Path, col: usize) -> PathBuf { - dir.join(format!("col_{col:06}.pbiv")) -} - -fn col_path_int(dir: &Path, col: usize) -> PathBuf { - dir.join(format!("col_{col:06}.pciv")) -} - -fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { - fs::write( - dir.join("meta.json"), - format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), - ) -} - // ── KmerPartition::merge_partition ──────────────────────────────────────────── impl KmerPartition { @@ -180,8 +110,8 @@ impl KmerPartition { return Ok(0); } - load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open - let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?); + load_meta(&dst_index_dir, "merge")?; // ensure meta.json exists before LayeredMap::open + let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?); let n_dst_layers = dst_map.n_layers(); let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum(); @@ -191,7 +121,7 @@ impl KmerPartition { for l in 0..n_dst_layers { let layer_dir = dst_index_dir.join(format!("layer_{l}")); Layer::<()>::init_presence_matrix(&layer_dir, dst_map.layer(l).n()) - .map_err(olm_to_sk)?; + .map_err(|e| olm_to_sk(e, "merge"))?; } } @@ -209,7 +139,7 @@ impl KmerPartition { if !src_index_dir.exists() { continue; } - let src_meta = load_meta(&src_index_dir)?; + let src_meta = load_meta(&src_index_dir, "merge")?; for l in 0..src_meta.n_layers { let p = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin"); if p.exists() { @@ -221,92 +151,36 @@ impl KmerPartition { let n_src_layers = unitig_paths.len(); debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)"); - enum Pass1Data { - File((PathBuf, ThrottleGuard)), - Batch(Vec), - NewKmers(Vec), - } - const BATCH: usize = 4096; let n_workers = rayon::current_num_threads().min(16).max(4); - let capacity = 2; // At most 2 files open simultaneously: keeps n_workers-2 workers free // for the Transform stage. Each open file monopolises one worker for the // full duration of its read, so this must stay well below n_workers. - let max_open = 2; + let max_open = 2; let dst_filter = Arc::clone(&dst_map); - let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new())); - let g_sink = Arc::clone(&g_shared); - let pass1_err: Arc>> = Arc::new(Mutex::new(None)); - let err_cap = Arc::clone(&pass1_err); - let throttled_paths = throttle(unitig_paths.into_iter(), max_open); - - let pipeline = Pipeline::new( - make_source!(Pass1Data, throttled_paths.map(|t| (t.item, t.guard)), File), - vec![ - Stage::Flat(Arc::new( - move |data: Pass1Data, - push: &PipelineSender>, - delta: &PipelineSender| - { - if let Pass1Data::File((path, _guard)) = data { - // _guard is dropped at end of this block, releasing the slot. - let reader = match UnitigFileReader::open_sequential(&path) { - Ok(r) => r, - Err(e) => { - *err_cap.lock().unwrap() = Some(e.to_string()); - delta.send(-1).ok(); - return; - } - }; - let mut batch: Vec = Vec::with_capacity(BATCH); - let mut count: isize = 0; - for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - batch.push(kmer); - if batch.len() == BATCH { - let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH)); - push.send(Ok(Pass1Data::Batch(b))).ok(); - count += 1; - } - } - if !batch.is_empty() { - push.send(Ok(Pass1Data::Batch(batch))).ok(); - count += 1; - } - delta.send(count - 1).ok(); - } - } - ) as SharedFlatFn), - make_transform!(Pass1Data, { - move |batch: Vec| -> Vec { - batch.into_iter() - .filter(|&k| dst_filter.query(k).is_none()) - .collect() - } - }, Batch, NewKmers), - ], - make_sink!(Pass1Data, { - move |batch: Vec| { - let mut g = g_sink.lock().unwrap(); - for kmer in batch { - g.push(kmer); + let g = build_graph( + unitig_paths.into_iter(), + move |path: PathBuf, emit: &mut dyn FnMut(Vec)| -> SKResult<()> { + let reader = UnitigFileReader::open_sequential(&path)?; + let mut batch: Vec = Vec::with_capacity(BATCH); + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + batch.push(kmer); + if batch.len() == BATCH { + emit(std::mem::replace(&mut batch, Vec::with_capacity(BATCH))); } } - }, NewKmers), - ); + if !batch.is_empty() { + emit(batch); + } + Ok(()) + }, + move |kmer| dst_filter.query(kmer).is_none(), + n_workers, + max_open, + )?; - WorkerPool::new(pipeline, n_workers, capacity).run(); - - if let Some(msg) = Arc::try_unwrap(pass1_err).unwrap().into_inner().unwrap() { - return Err(SKError::InvalidData { context: "merge pass1", detail: msg }); - } - - let g = Arc::try_unwrap(g_shared) - .unwrap_or_else(|_| panic!("pass1: g_shared not uniquely owned after pipeline")) - .into_inner() - .unwrap_or_else(|e| e.into_inner()); let any_new = g.len() > 0; debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len()); @@ -315,25 +189,8 @@ impl KmerPartition { let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); let n_new = if any_new { - let t_deg = std::time::Instant::now(); - g.compute_degrees_and_mark_starts(); - debug!("partition {i}: compute_degrees in {:.3}s — {} nodes", - t_deg.elapsed().as_secs_f64(), g.len()); - fs::create_dir_all(&new_layer_dir)?; - let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; debug!("partition {i}: unitig traversal start — {} nodes", g.len()); - let mut n_unitigs = 0usize; - g.try_for_each_unitig(|unitig| { - n_unitigs += 1; - uw.write(unitig) - })?; - debug!("partition {i}: unitig writer closing"); - uw.close()?; - let n_nodes = g.len(); - debug!("partition {i}: unitig writer closed — dropping graph ({n_nodes} nodes)"); - drop(g); - debug!("partition {i}: graph dropped — starting MPHF build ({n_unitigs} unitigs)"); - Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; + let n_nodes = materialize_layer(g, &new_layer_dir, block_bits, evidence)?; debug!("partition {i}: MPHF build done"); n_nodes } else { @@ -343,7 +200,7 @@ impl KmerPartition { let t_open = std::time::Instant::now(); let new_mphf: Option> = if any_new { - Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?)) + Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(|e| olm_to_sk(e, "merge"))?)) } else { None }; @@ -449,7 +306,7 @@ impl KmerPartition { col_offset += src_n; continue; } - let src_meta = load_meta(&src_index_dir)?; + let src_meta = load_meta(&src_index_dir, "merge")?; for l in 0..src_meta.n_layers { let src_layer_dir = src_index_dir.join(format!("layer_{l}")); if src_layer_dir.join("unitigs.bin").exists() { @@ -483,6 +340,7 @@ impl KmerPartition { let pass2_err: Arc>> = Arc::new(Mutex::new(None)); let err_cap2 = Arc::clone(&pass2_err); + let capacity = 2; let throttled_pass2 = throttle(pass2_items.into_iter(), max_open); let pipeline2 = Pipeline::new( @@ -516,6 +374,7 @@ impl KmerPartition { return; } }; + const BATCH: usize = 4096; let mut batch: Vec = Vec::with_capacity(BATCH); let mut count: isize = 0; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { @@ -617,9 +476,9 @@ impl KmerPartition { write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total) .map_err(SKError::Io)?; - let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?; + let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?; part_meta.n_layers = new_layer_idx + 1; - part_meta.save(&dst_index_dir).map_err(olm_to_sk)?; + part_meta.save(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?; } debug!("partition {i}: builders closed in {:.3}s", t_close.elapsed().as_secs_f64()); diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index ea09536..6bd40f3 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -1,6 +1,4 @@ -use std::fs; -use std::io; -use std::path::{Path, PathBuf}; +use std::path::Path; use obicompactvec::{ PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder, @@ -9,85 +7,17 @@ use obicompactvec::{ use obidebruinj::GraphDeBruijn; use obikseq::CanonicalKmer; use obilayeredmap::meta::PartitionMeta; -use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError}; +use obilayeredmap::{IndexMode, MphfLayer}; use obiskio::{SKError, SKResult, UnitigFileReader}; +use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta}; use crate::filter::{KmerFilter, passes_all}; +use crate::graph_pipeline::materialize_layer; use crate::merge_layer::{MergeMode, SrcLayerData}; use crate::partition::KmerPartition; const INDEX_SUBDIR: &str = "index"; -fn olm_to_sk(e: OLMError) -> SKError { - match e { - OLMError::Io(e) => SKError::Io(e), - other => SKError::InvalidData { - context: "rebuild", - detail: other.to_string(), - }, - } -} - -fn col_path_bit(dir: &Path, col: usize) -> PathBuf { - dir.join(format!("col_{col:06}.pbiv")) -} - -fn col_path_int(dir: &Path, col: usize) -> PathBuf { - dir.join(format!("col_{col:06}.pciv")) -} - -fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { - fs::write( - dir.join("meta.json"), - format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), - ) -} - -// ── ColBuilder ──────────────────────────────────────────────────────────────── - -enum ColBuilder { - Bit(PersistentBitVecBuilder), - Int(PersistentCompactIntVecBuilder), -} - -impl ColBuilder { - fn set_val(&mut self, slot: usize, value: u32) { - match self { - ColBuilder::Bit(b) => b.set(slot, value > 0), - ColBuilder::Int(b) => b.set(slot, value), - } - } - - fn close(self) -> SKResult<()> { - match self { - ColBuilder::Bit(b) => b.close().map_err(SKError::Io), - ColBuilder::Int(b) => b.close().map_err(SKError::Io), - } - } -} - -// ── Helpers ─────────────────────────────────────────────────────────────────── - -fn load_meta(dir: &Path) -> SKResult { - match PartitionMeta::load(dir) { - Ok(m) => Ok(m), - Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => - { - let mut n = 0usize; - while dir.join(format!("layer_{n}")).exists() { - n += 1; - } - let m = PartitionMeta { - n_layers: n, - mode: IndexMode::default(), - }; - m.save(dir).map_err(olm_to_sk)?; - Ok(m) - } - Err(e) => Err(olm_to_sk(e)), - } -} - /// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`. /// /// Uses [`SrcLayerData`] semantics: counts take priority over presence when @@ -99,7 +29,7 @@ fn iter_src_layers( filters: &[Box], mut cb: impl FnMut(CanonicalKmer, Box<[u32]>), ) -> SKResult<()> { - let src_meta = load_meta(src_index_dir)?; + let src_meta = load_meta(src_index_dir, "rebuild")?; for l in 0..src_meta.n_layers { let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let unitigs_path = src_layer_dir.join("unitigs.bin"); @@ -144,7 +74,7 @@ impl KmerPartition { return Ok(()); } - let src_meta = load_meta(&src_index_dir)?; + let src_meta = load_meta(&src_index_dir, "rebuild")?; if src_meta.n_layers == 0 { return Ok(()); } @@ -159,30 +89,20 @@ impl KmerPartition { return Ok(()); } - let n_new = g.len(); - g.compute_degrees_and_mark_starts(); - // ── Build MPHF in dst layer_0 ───────────────────────────────────────── let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); let dst_layer_dir = dst_index_dir.join("layer_0"); - fs::create_dir_all(&dst_layer_dir)?; - let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; - g.try_for_each_unitig(|unitig| { - uw.write(unitig) - })?; - uw.close()?; - drop(g); - - Layer::<()>::build(&dst_layer_dir, block_bits, &IndexMode::Exact).map_err(olm_to_sk)?; - let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact).map_err(olm_to_sk)?; + let n_new = materialize_layer(g, &dst_layer_dir, block_bits, &IndexMode::Exact)?; + let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact) + .map_err(|e| olm_to_sk(e, "rebuild"))?; // ── Prepare matrix builders (one column per genome) ─────────────────── let data_dir = match mode { MergeMode::Presence => dst_layer_dir.join("presence"), MergeMode::Count => dst_layer_dir.join("counts"), }; - fs::create_dir_all(&data_dir)?; + std::fs::create_dir_all(&data_dir)?; let mut builders: Vec = match mode { MergeMode::Presence => { @@ -234,7 +154,7 @@ impl KmerPartition { mode: IndexMode::Exact, } .save(&dst_index_dir) - .map_err(olm_to_sk)?; + .map_err(|e| olm_to_sk(e, "rebuild"))?; Ok(()) }