Push pslsukyowzrp #32

Merged
coissac merged 2 commits from push-pslsukyowzrp into main 2026-06-15 16:29:25 +00:00
13 changed files with 489 additions and 613 deletions
+33 -16
View File
@@ -1,5 +1,5 @@
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{self, Write as _}; use std::io::{self, BufWriter, Write as _};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use memmap2::Mmap; use memmap2::Mmap;
@@ -230,30 +230,47 @@ impl PackedBitMatrix {
/// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files. /// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files.
pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> { pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> {
let packed_path = dir.join("matrix.pbmx");
if packed_path.exists() {
// Matrix complete; remove any leftover column files from a killed cleanup.
if let Ok(meta) = MatrixMeta::load(dir) {
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
let _ = fs::remove_file(dir.join("meta.json"));
}
return Ok(());
}
let meta = MatrixMeta::load(dir)?; let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols; let n_cols = meta.n_cols;
let col_files: Vec<Vec<u8>> = (0..n_cols) // Compute offsets from file sizes — no column data loaded into RAM.
.map(|c| fs::read(col_path(dir, c))) let col_sizes: Vec<u64> = (0..n_cols)
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
.collect::<io::Result<_>>()?; .collect::<io::Result<_>>()?;
let header_size = PBMX_HEADER + n_cols * 8; let header_size = (PBMX_HEADER + n_cols * 8) as u64;
let mut col_offset = header_size; let mut col_offset = header_size;
let mut offsets = Vec::with_capacity(n_cols); let mut offsets = Vec::with_capacity(n_cols);
for data in &col_files { for &size in &col_sizes {
offsets.push(col_offset as u64); offsets.push(col_offset);
col_offset += data.len(); col_offset += size;
} }
let packed_path = dir.join("matrix.pbmx"); // Write to a temp file; rename atomically so a killed process never leaves
let mut file = File::create(&packed_path)?; // a truncated matrix.pbmx that would be mistaken for a complete file.
file.write_all(&PBMX_MAGIC)?; let tmp_path = dir.join("matrix.pbmx.tmp");
file.write_all(&[0u8; 4])?; let mut out = BufWriter::new(File::create(&tmp_path)?);
file.write_all(&(meta.n as u64).to_le_bytes())?; out.write_all(&PBMX_MAGIC)?;
file.write_all(&(n_cols as u64).to_le_bytes())?; out.write_all(&[0u8; 4])?;
for &off in &offsets { file.write_all(&off.to_le_bytes())?; } out.write_all(&(meta.n as u64).to_le_bytes())?;
for data in &col_files { file.write_all(data)?; } out.write_all(&(n_cols as u64).to_le_bytes())?;
drop(file); for &off in &offsets { out.write_all(&off.to_le_bytes())?; }
for c in 0..n_cols {
io::copy(&mut File::open(col_path(dir, c))?, &mut out)?;
}
out.flush()?;
drop(out);
fs::rename(&tmp_path, &packed_path)?;
for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; } for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; }
fs::remove_file(dir.join("meta.json"))?; fs::remove_file(dir.join("meta.json"))?;
+33 -16
View File
@@ -1,6 +1,6 @@
use std::cmp::Ordering; use std::cmp::Ordering;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{self, Write as _}; use std::io::{self, BufWriter, Write as _};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use memmap2::Mmap; use memmap2::Mmap;
@@ -354,30 +354,47 @@ impl PackedCompactIntMatrix {
/// Build `counts/matrix.pcmx` from existing `col_*.pciv` files. /// Build `counts/matrix.pcmx` from existing `col_*.pciv` files.
pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> { pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> {
let packed_path = dir.join("matrix.pcmx");
if packed_path.exists() {
// Matrix complete; remove any leftover column files from a killed cleanup.
if let Ok(meta) = MatrixMeta::load(dir) {
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
let _ = fs::remove_file(dir.join("meta.json"));
}
return Ok(());
}
let meta = MatrixMeta::load(dir)?; let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols; let n_cols = meta.n_cols;
let col_files: Vec<Vec<u8>> = (0..n_cols) // Compute offsets from file sizes — no column data loaded into RAM.
.map(|c| fs::read(col_path(dir, c))) let col_sizes: Vec<u64> = (0..n_cols)
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
.collect::<io::Result<_>>()?; .collect::<io::Result<_>>()?;
let header_size = PCMX_HEADER + n_cols * 8; let header_size = (PCMX_HEADER + n_cols * 8) as u64;
let mut col_offset = header_size; let mut col_offset = header_size;
let mut offsets = Vec::with_capacity(n_cols); let mut offsets = Vec::with_capacity(n_cols);
for data in &col_files { for &size in &col_sizes {
offsets.push(col_offset as u64); offsets.push(col_offset);
col_offset += data.len(); col_offset += size;
} }
let packed_path = dir.join("matrix.pcmx"); // Write to a temp file; rename atomically so a killed process never leaves
let mut file = File::create(&packed_path)?; // a truncated matrix.pcmx that would be mistaken for a complete file.
file.write_all(&PCMX_MAGIC)?; let tmp_path = dir.join("matrix.pcmx.tmp");
file.write_all(&[0u8; 4])?; let mut out = BufWriter::new(File::create(&tmp_path)?);
file.write_all(&(meta.n as u64).to_le_bytes())?; out.write_all(&PCMX_MAGIC)?;
file.write_all(&(n_cols as u64).to_le_bytes())?; out.write_all(&[0u8; 4])?;
for &off in &offsets { file.write_all(&off.to_le_bytes())?; } out.write_all(&(meta.n as u64).to_le_bytes())?;
for data in &col_files { file.write_all(data)?; } out.write_all(&(n_cols as u64).to_le_bytes())?;
drop(file); for &off in &offsets { out.write_all(&off.to_le_bytes())?; }
for c in 0..n_cols {
io::copy(&mut File::open(col_path(dir, c))?, &mut out)?;
}
out.flush()?;
drop(out);
fs::rename(&tmp_path, &packed_path)?;
for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; } for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; }
fs::remove_file(dir.join("meta.json"))?; fs::remove_file(dir.join("meta.json"))?;
+27 -46
View File
@@ -1,8 +1,6 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use obikpartitionner::{KmerPartition, KmerSpectrum}; use obikpartitionner::{KmerPartition, KmerSpectrum};
use obilayeredmap; use obilayeredmap;
@@ -152,31 +150,25 @@ impl KmerIndex {
let with_counts = self.meta.config.with_counts; let with_counts = self.meta.config.with_counts;
let evidence = self.meta.config.evidence.clone(); let evidence = self.meta.config.evidence.clone();
let block_bits = self.meta.config.block_bits; let block_bits = self.meta.config.block_bits;
let total_kmers = AtomicUsize::new(0); let mut total_kmers: usize = 0;
let pb = progress_bar("index", n as u64, "partitions");
let pb = Arc::new(Mutex::new(progress_bar("index", n as u64, "partitions"))); let order: Vec<usize> = (0..n).collect();
let runner = crate::numa::PartitionRunner::new();
(0..n).into_par_iter().for_each(|i| { runner.run(
match self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits) { &order,
Ok(0) => {} |i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits),
Ok(n_kmers) => { |i, n_kmers, _| {
total_kmers.fetch_add(n_kmers, Ordering::Relaxed); if n_kmers > 0 {
let pb = pb.lock().unwrap(); total_kmers += n_kmers;
pb.inc(1); pb.inc(1);
pb.set_message(format!("{i}: {n_kmers} kmers")); pb.set_message(format!("{i}: {n_kmers} kmers"));
} }
Err(e) => { },
eprintln!("error building layer for partition {i}: {e}"); ).map_err(OKIError::Partition)?;
std::process::exit(1);
}
}
});
pb.lock().unwrap().finish_and_clear(); pb.finish_and_clear();
info!( info!("done — {} total kmers indexed", total_kmers);
"done — {} total kmers indexed",
total_kmers.load(Ordering::Relaxed)
);
if !keep_intermediate { if !keep_intermediate {
for i in 0..n { for i in 0..n {
@@ -211,36 +203,25 @@ impl KmerIndex {
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
let n = self.n_partitions(); let n = self.n_partitions();
let errors: Vec<_> = (0..n) let order: Vec<usize> = (0..n).collect();
.into_par_iter() crate::numa::PartitionRunner::new().run(
.filter_map(|i| { &order,
|i| -> OKIResult<()> {
let index_dir = self.partition.part_dir(i).join("index"); let index_dir = self.partition.part_dir(i).join("index");
if !index_dir.exists() { return None; } if !index_dir.exists() { return Ok(()); }
let meta = match PartitionMeta::load(&index_dir) { let meta = PartitionMeta::load(&index_dir)
Ok(m) => m, .map_err(|e| OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())))?;
Err(e) => return Some(OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
};
for l in 0..meta.n_layers { for l in 0..meta.n_layers {
let layer_dir = index_dir.join(format!("layer_{l}")); let layer_dir = index_dir.join(format!("layer_{l}"));
let presence_dir = layer_dir.join("presence"); let presence_dir = layer_dir.join("presence");
let counts_dir = layer_dir.join("counts"); let counts_dir = layer_dir.join("counts");
if presence_dir.exists() { if presence_dir.exists() { pack_bit_matrix(&presence_dir).map_err(OKIError::Io)?; }
if let Err(e) = pack_bit_matrix(&presence_dir) { if counts_dir.exists() { pack_compact_int_matrix(&counts_dir).map_err(OKIError::Io)?; }
return Some(OKIError::Io(e));
}
}
if counts_dir.exists() {
if let Err(e) = pack_compact_int_matrix(&counts_dir) {
return Some(OKIError::Io(e));
}
}
} }
None Ok(())
}) },
.collect(); |_, _, _| {},
)
if let Some(e) = errors.into_iter().next() { return Err(e); }
Ok(())
} }
/// Write a `layer_meta.json` in any layer directory that is missing one. /// Write a `layer_meta.json` in any layer directory that is missing one.
+71 -175
View File
@@ -1,4 +1,4 @@
// NUMA-aware Rayon thread pools via hwlocality. // NUMA-aware partition runner via hwlocality.
// //
// Detects NUMA topology using hwloc (cross-platform: Linux, macOS, etc.) and // Detects NUMA topology using hwloc (cross-platform: Linux, macOS, etc.) and
// builds one Rayon ThreadPool per NUMA node with threads pinned to that node's // builds one Rayon ThreadPool per NUMA node with threads pinned to that node's
@@ -10,15 +10,14 @@
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket) // - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
// - any per-node pool fails to build // - any per-node pool fails to build
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use crossbeam_channel::{RecvTimeoutError, unbounded}; use crossbeam_channel::unbounded;
use hwlocality::Topology; use hwlocality::Topology;
use hwlocality::cpu::binding::CpuBindingFlags; use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet; use hwlocality::cpu::cpuset::CpuSet;
use hwlocality::object::types::ObjectType; use hwlocality::object::types::ObjectType;
use obisys::CpuSample;
use tracing::debug; use tracing::debug;
// ── Public interface ────────────────────────────────────────────────────────── // ── Public interface ──────────────────────────────────────────────────────────
@@ -104,27 +103,6 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
.ok() .ok()
} }
// ── Adaptive spawn heuristic ──────────────────────────────────────────────────
//
// First worker: spawn if CPU efficiency is below SPAWN_THRESHOLD (machine is
// under-utilised). Subsequent workers: spawn only if the last worker raised
// efficiency by at least the expected marginal gain (1/n_workers), with a
// minimum floor to avoid spurious spawns from efficiency fluctuations.
const SPAWN_THRESHOLD: f64 = 0.95;
const MIN_MARGINAL_GAIN: f64 = 0.03;
const SPAWN_POLL: Duration = Duration::from_secs(20);
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
if n_workers == 1 {
eff < SPAWN_THRESHOLD
} else {
let gain = eff - eff_at_last_spawn;
let expected = 1.0 / n_workers as f64;
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
}
}
// ── PartitionRunner ─────────────────────────────────────────────────────────── // ── PartitionRunner ───────────────────────────────────────────────────────────
struct NodeConfig { struct NodeConfig {
@@ -135,42 +113,38 @@ struct NodeConfig {
/// Generic NUMA-aware runner for partition-level parallel work. /// Generic NUMA-aware runner for partition-level parallel work.
/// ///
/// Encapsulates worker spawning, NUMA pinning, adaptive activation, and result /// Workers are distributed round-robin across NUMA nodes and pinned to their
/// collection. UMA systems are handled as the degenerate case of a single node /// node's CPUs. UMA systems are the degenerate case: one node, no pinning.
/// with no pinning.
/// ///
/// # Model /// # Termination
/// ///
/// One controller thread per NUMA node (one total on UMA). Each controller /// Termination is driven entirely by channel closure:
/// manages up to `max_workers` dormant workers that drain a shared work queue. ///
/// Workers are activated one at a time; a new worker is added when global CPU /// ```text
/// efficiency justifies it. On NUMA all workers are activated immediately /// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
/// (memory bandwidth, not CPU count, is the bottleneck). /// drop(result_tx) → result_rx closes → controller loop exits
/// ```
///
/// No explicit counter or sentinel needed.
pub struct PartitionRunner { pub struct PartitionRunner {
nodes: Vec<NodeConfig>, nodes: Vec<NodeConfig>,
n_cores: usize,
} }
impl PartitionRunner { impl PartitionRunner {
/// Detect topology and build. Falls back to a single-node UMA runner on /// Total worker slots across all nodes.
/// macOS, single-socket machines, or hwloc failure.
/// Total number of pre-spawned worker slots across all nodes.
pub fn max_workers(&self) -> usize { pub fn max_workers(&self) -> usize {
self.nodes.iter().map(|n| n.max_workers).sum() self.nodes.iter().map(|n| n.max_workers).sum()
} }
/// Detect topology and build. Falls back to a single-node UMA runner on
/// macOS, single-socket machines, or hwloc failure.
pub fn new() -> Self { pub fn new() -> Self {
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
match build() { match build() {
Some(ns) => { Some(ns) => {
let wpn = ns.workers_per_node(); let wpn = ns.workers_per_node();
debug!( debug!(
"PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node", "PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node",
ns.pools.len(), ns.pools.len(), wpn,
wpn,
); );
let nodes = ns.pools let nodes = ns.pools
.into_iter() .into_iter()
@@ -181,21 +155,20 @@ impl PartitionRunner {
max_workers: wpn, max_workers: wpn,
}) })
.collect(); .collect();
Self { nodes, n_cores } Self { nodes }
} }
None => { None => {
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let max_workers = (n_cores / 2).max(1); let max_workers = (n_cores / 2).max(1);
debug!( debug!("PartitionRunner: UMA mode — {} worker(s)", max_workers);
"PartitionRunner: UMA mode — adaptive up to {} worker(s)",
max_workers,
);
Self { Self {
nodes: vec![NodeConfig { nodes: vec![NodeConfig {
pool: None, pool: None,
cpu_ids: vec![], cpu_ids: vec![],
max_workers, max_workers,
}], }],
n_cores,
} }
} }
} }
@@ -203,19 +176,17 @@ impl PartitionRunner {
/// Run `f(i)` for every index in `order`. /// Run `f(i)` for every index in `order`.
/// ///
/// `on_done(i, result, elapsed)` is called under an internal mutex as each /// Workers are spawned upfront and distributed round-robin across NUMA
/// partition completes — suitable for progress bars, logging, and result /// nodes. `on_done(i, result, elapsed)` is called from the controller
/// aggregation. No `Send` or `Sync` bound is required on the callback. /// thread as each partition completes — suitable for progress bars and
/// /// result aggregation.
/// The work queue is shared across all NUMA nodes: any idle worker takes
/// the next available partition regardless of node, ensuring load balance.
/// ///
/// Returns the first error produced by `f`, if any. /// Returns the first error produced by `f`, if any.
pub fn run<F, R, E, C>( pub fn run<F, R, E, C>(
&self, &self,
order: &[usize], order: &[usize],
f: F, f: F,
on_done: C, mut on_done: C,
) -> Result<(), E> ) -> Result<(), E>
where where
F: Fn(usize) -> Result<R, E> + Send + Sync, F: Fn(usize) -> Result<R, E> + Send + Sync,
@@ -223,131 +194,56 @@ impl PartitionRunner {
E: Send, E: Send,
C: FnMut(usize, R, Duration) + Send, C: FnMut(usize, R, Duration) + Send,
{ {
let f = Arc::new(f); // Pre-load the work queue, then drop the sender so workers' part_rx
let on_done = Arc::new(Mutex::new(on_done)); // iterators terminate when the queue is drained.
let first_err: Arc<Mutex<Option<E>>> = Arc::new(Mutex::new(None));
// Shared work queue — pre-loaded in caller-supplied order.
let (part_tx, part_rx) = unbounded::<usize>(); let (part_tx, part_rx) = unbounded::<usize>();
for &i in order { for &i in order { part_tx.send(i).ok(); }
part_tx.send(i).ok();
}
drop(part_tx); drop(part_tx);
let n_cores = self.n_cores; let (result_tx, result_rx) = unbounded::<(usize, Result<R, E>, Duration)>();
let n_nodes = self.nodes.len();
let f = &f; // shared borrow; F: Sync so concurrent calls are safe
let mut first_err: Option<E> = None;
std::thread::scope(|s| { std::thread::scope(|s| {
for node in &self.nodes { // Spawn all workers upfront, round-robin across NUMA nodes.
let f = Arc::clone(&f); for w in 0..self.max_workers() {
let on_done = Arc::clone(&on_done); let node = &self.nodes[w % n_nodes];
let first_err = Arc::clone(&first_err); let prx = part_rx.clone();
let part_rx = part_rx.clone(); let rtx = result_tx.clone();
let pool = node.pool.clone();
let cpu_ids = &node.cpu_ids;
s.spawn(move || { s.spawn(move || {
// Per-node result and activation channels. if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
let (result_tx, result_rx) = for i in &prx {
unbounded::<(usize, Result<R, E>, Duration)>(); let t = Instant::now();
let (activate_tx, activate_rx) = unbounded::<()>(); let r = match &pool {
Some(p) => p.install(|| f(i)),
std::thread::scope(|ws| { None => f(i),
// Pre-spawn workers (all dormant until activated). };
for _ in 0..node.max_workers { rtx.send((i, r, t.elapsed())).ok();
let prx = part_rx.clone(); }
let rtx = result_tx.clone();
let arx = activate_rx.clone();
let f = Arc::clone(&f);
let pool = node.pool.clone();
let cpu_ids = node.cpu_ids.clone();
ws.spawn(move || {
if !cpu_ids.is_empty() {
pin_current_thread(&cpu_ids);
}
if arx.recv().is_err() {
return; // never activated — exit cleanly
}
for i in &prx {
let t = Instant::now();
let r = match &pool {
Some(p) => p.install(|| f(i)),
None => f(i),
};
rtx.send((i, r, t.elapsed())).ok();
}
});
}
// Drop the controller's copy: result_rx disconnects
// once all worker copies are also dropped (workers done).
drop(result_tx);
// In NUMA mode activate all workers immediately;
// in UMA mode activate one and grow adaptively.
let numa_mode = node.pool.is_some();
let initial = if numa_mode { node.max_workers } else { 1 };
for _ in 0..initial {
activate_tx.send(()).ok();
}
let mut active_workers = initial;
let mut cpu_sample = CpuSample::now();
let mut eff_at_last_spawn = 0.0f64;
// Controller loop.
loop {
match result_rx.recv_timeout(SPAWN_POLL) {
Ok((i, r, dur)) => {
match r {
Ok(v) => {
on_done.lock().unwrap()(i, v, dur);
}
Err(e) => {
let mut g = first_err.lock().unwrap();
if g.is_none() { *g = Some(e); }
}
}
if !numa_mode && active_workers < node.max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
debug!(
"activated worker {} — efficiency {:.0}%",
active_workers + 1,
eff * 100.0,
);
activate_tx.send(()).ok();
active_workers += 1;
eff_at_last_spawn = eff;
cpu_sample = CpuSample::now();
}
}
}
Err(RecvTimeoutError::Timeout) => {
if !numa_mode && active_workers < node.max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
debug!(
"activated worker {} (poll) — efficiency {:.0}%",
active_workers + 1,
eff * 100.0,
);
activate_tx.send(()).ok();
active_workers += 1;
eff_at_last_spawn = eff;
cpu_sample = CpuSample::now();
}
}
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
// Signal any dormant workers that were never activated
// to exit (UMA mode where max_workers was never reached).
drop(activate_tx);
}); // ws: waits for all workers of this node
}); });
} }
}); // s: waits for all node controllers
let mut g = first_err.lock().unwrap(); // Drop the controller's sender: result_rx closes once all worker
match g.take() { // rtx clones are dropped (i.e. all workers have exited).
drop(result_tx);
// Drain results concurrently with workers. The for loop exits
// when result_rx is disconnected — at that point all workers are
// done and the scope join below is instantaneous.
for (i, r, dur) in &result_rx {
match r {
Ok(v) => on_done(i, v, dur),
Err(e) => { if first_err.is_none() { first_err = Some(e); } }
}
}
});
match first_err {
Some(e) => Err(e), Some(e) => Err(e),
None => Ok(()), None => Ok(()),
} }
+7 -15
View File
@@ -4,7 +4,6 @@ use std::path::Path;
use obikpartitionner::{KmerFilter, KmerPartition, MergeMode}; use obikpartitionner::{KmerFilter, KmerPartition, MergeMode};
use obisys::{Reporter, Stage, progress_bar}; use obisys::{Reporter, Stage, progress_bar};
use rayon::prelude::*;
use tracing::info; use tracing::info;
use crate::error::{OKIError, OKIResult}; use crate::error::{OKIError, OKIResult};
@@ -83,23 +82,16 @@ impl KmerIndex {
let src_partition = &src.partition; let src_partition = &src.partition;
let block_bits = meta.config.block_bits; let block_bits = meta.config.block_bits;
let errors: Vec<obiskio::SKError> = (0..n_partitions) let order: Vec<usize> = (0..n_partitions).collect();
.into_par_iter() let runner = crate::numa::PartitionRunner::new();
.filter_map(|i| { runner.run(
let result = dst_partition &order,
.rebuild_partition(src_partition, i, filters, mode, n_genomes, block_bits) |i| dst_partition.rebuild_partition(src_partition, i, filters, mode, n_genomes, block_bits),
.err(); |_, _, _| { pb.inc(1); },
pb.inc(1); ).map_err(OKIError::Partition)?;
result
})
.collect();
pb.finish_and_clear(); pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::Partition(e));
}
rep.push(t.stop()); rep.push(t.stop());
// Write SENTINEL_INDEXED — output is ready to use. // Write SENTINEL_INDEXED — output is ready to use.
+8 -17
View File
@@ -3,7 +3,6 @@ use std::path::Path;
use obilayeredmap::{IndexMode, layer::Layer}; use obilayeredmap::{IndexMode, layer::Layer};
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obisys::{Reporter, Stage, progress_bar}; use obisys::{Reporter, Stage, progress_bar};
use rayon::prelude::*;
use tracing::info; use tracing::info;
use crate::error::{OKIError, OKIResult}; use crate::error::{OKIError, OKIResult};
@@ -45,25 +44,17 @@ impl KmerIndex {
let t = Stage::start("reindex"); let t = Stage::start("reindex");
let pb = progress_bar("reindex", n as u64, "partitions"); let pb = progress_bar("reindex", n as u64, "partitions");
let errors: Vec<String> = (0..n) let order: Vec<usize> = (0..n).collect();
.into_par_iter() let runner = crate::numa::PartitionRunner::new();
.filter_map(|i| { runner.run(
let res = reindex_partition( &order,
&self.partition.part_dir(i).join("index"), |i| reindex_partition(&self.partition.part_dir(i).join("index"), &target, block_bits)
&target, .map_err(|e| OKIError::InvalidInput(format!("partition {i}: {e}"))),
block_bits, |_, _, _| { pb.inc(1); },
); )?;
pb.inc(1);
res.err().map(|e| format!("partition {i}: {e}"))
})
.collect();
pb.finish_and_clear(); pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::InvalidInput(e));
}
self.meta.config.evidence = target; self.meta.config.evidence = target;
if matches!(self.meta.config.evidence, IndexMode::Exact) { if matches!(self.meta.config.evidence, IndexMode::Exact) {
self.meta.config.block_bits = block_bits; self.meta.config.block_bits = block_bits;
+15 -33
View File
@@ -4,7 +4,6 @@ use std::path::Path;
use obikpartitionner::{KmerPartition, OutputCol, PARTITIONS_SUBDIR}; use obikpartitionner::{KmerPartition, OutputCol, PARTITIONS_SUBDIR};
use obisys::{Stage, progress_bar}; use obisys::{Stage, progress_bar};
use rayon::prelude::*;
use tracing::info; use tracing::info;
use crate::error::{OKIError, OKIResult}; use crate::error::{OKIError, OKIResult};
@@ -72,25 +71,16 @@ impl KmerIndex {
let pb = progress_bar("select", n_partitions as u64, "partitions"); let pb = progress_bar("select", n_partitions as u64, "partitions");
let src_partition = &src.partition; let src_partition = &src.partition;
let errors: Vec<obiskio::SKError> = (0..n_partitions) let order: Vec<usize> = (0..n_partitions).collect();
.into_par_iter() let runner = crate::numa::PartitionRunner::new();
.filter_map(|i| { runner.run(
let result = dst_partition.select_partition( &order,
src_partition, i, specs, |i| dst_partition.select_partition(src_partition, i, specs, n_src_genomes, threshold, output_presence, false),
n_src_genomes, threshold, output_presence, |_, _, _| { pb.inc(1); },
false, ).map_err(OKIError::Partition)?;
);
pb.inc(1);
result.err()
})
.collect();
pb.finish_and_clear(); pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::Partition(e));
}
let _ = t.stop(); let _ = t.stop();
fs::File::create(output.join(SENTINEL_INDEXED))?; fs::File::create(output.join(SENTINEL_INDEXED))?;
@@ -132,25 +122,17 @@ impl KmerIndex {
let t = Stage::start("select"); let t = Stage::start("select");
let pb = progress_bar("select", n_partitions as u64, "partitions"); let pb = progress_bar("select", n_partitions as u64, "partitions");
let errors: Vec<obiskio::SKError> = (0..n_partitions) let partition = &self.partition;
.into_par_iter() let order: Vec<usize> = (0..n_partitions).collect();
.filter_map(|i| { let runner = crate::numa::PartitionRunner::new();
let result = self.partition.select_partition( runner.run(
&src_partition, i, specs, &order,
n_src_genomes, threshold, output_presence, |i| partition.select_partition(&src_partition, i, specs, n_src_genomes, threshold, output_presence, true),
true, |_, _, _| { pb.inc(1); },
); ).map_err(OKIError::Partition)?;
pb.inc(1);
result.err()
})
.collect();
pb.finish_and_clear(); pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::Partition(e));
}
let _ = t.stop(); let _ = t.stop();
// Update index.meta with new genome list and with_counts flag. // Update index.meta with new genome list and with_counts flag.
+84
View File
@@ -0,0 +1,84 @@
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use obicompactvec::{PersistentBitVecBuilder, PersistentCompactIntVecBuilder};
use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, OLMError};
use obiskio::{SKError, SKResult};
// ── olm_to_sk ────────────────────────────────────────────────────────────────
pub(crate) fn olm_to_sk(e: OLMError, context: &'static str) -> SKError {
match e {
OLMError::Io(e) => SKError::Io(e),
other => SKError::InvalidData {
context,
detail: other.to_string(),
},
}
}
// ── load_meta ────────────────────────────────────────────────────────────────
/// Load PartitionMeta, or recover it by probing layer directories.
/// Indexes built before meta.json was introduced lack the file.
pub(crate) fn load_meta(dir: &Path, context: &'static str) -> SKResult<PartitionMeta> {
match PartitionMeta::load(dir) {
Ok(m) => Ok(m),
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
{
let mut n = 0usize;
while dir.join(format!("layer_{n}")).exists() {
n += 1;
}
let m = PartitionMeta {
n_layers: n,
mode: IndexMode::default(),
};
m.save(dir).map_err(|e| olm_to_sk(e, context))?;
Ok(m)
}
Err(e) => Err(olm_to_sk(e, context)),
}
}
// ── path helpers ──────────────────────────────────────────────────────────────
pub(crate) fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pbiv"))
}
pub(crate) fn col_path_int(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pciv"))
}
pub(crate) fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
fs::write(
dir.join("meta.json"),
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
)
}
// ── ColBuilder ────────────────────────────────────────────────────────────────
pub(crate) enum ColBuilder {
Bit(PersistentBitVecBuilder),
Int(PersistentCompactIntVecBuilder),
}
impl ColBuilder {
pub(crate) fn set_val(&mut self, slot: usize, value: u32) {
match self {
ColBuilder::Bit(b) => b.set(slot, value > 0),
ColBuilder::Int(b) => b.set(slot, value),
}
}
pub(crate) fn close(self) -> SKResult<()> {
match self {
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
}
}
}
+152
View File
@@ -0,0 +1,152 @@
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tracing::debug;
use obipipeline::{
Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool,
make_sink, make_source, make_transform,
throttle,
};
use obidebruinj::GraphDeBruijn;
use obikseq::CanonicalKmer;
use obilayeredmap::{IndexMode, Layer};
use obiskio::{SKError, SKResult};
use crate::common::olm_to_sk;
// ── KmerGraphData ─────────────────────────────────────────────────────────────
enum KmerGraphData {
File(obipipeline::Throttled<PathBuf>),
RawBatch(Vec<CanonicalKmer>),
FilteredBatch(Vec<CanonicalKmer>),
}
// ── build_graph ───────────────────────────────────────────────────────────────
/// Phase 1: pipeline that reads files, filters kmers, pushes into a GraphDeBruijn.
///
/// `flat_fn(path, emit)`: opens path, iterates kmers, calls `emit(batch)` for each batch.
/// `filter(kmer) -> bool`: secondary filter applied in the Transform stage.
pub(crate) fn build_graph<I, F, G>(
file_source: I,
flat_fn: F,
filter: G,
n_workers: usize,
max_open: usize,
) -> SKResult<GraphDeBruijn>
where
I: Iterator<Item = PathBuf> + Send + 'static,
F: Fn(PathBuf, &mut dyn FnMut(Vec<CanonicalKmer>)) -> SKResult<()> + Send + Sync + 'static,
G: Fn(CanonicalKmer) -> bool + Send + Sync + 'static,
{
let capacity = 2;
let flat_fn = Arc::new(flat_fn);
let filter = Arc::new(filter);
let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new()));
let g_sink = Arc::clone(&g_shared);
let err_cap: Arc<Mutex<Option<SKError>>> = Arc::new(Mutex::new(None));
let err_flat = Arc::clone(&err_cap);
let throttled = throttle(file_source, max_open);
let pipeline = Pipeline::new(
make_source!(KmerGraphData, throttled, File),
vec![
Stage::Flat(Arc::new(
move |data: KmerGraphData,
push: &PipelineSender<Result<KmerGraphData, PipelineError>>,
delta: &PipelineSender<isize>|
{
if let KmerGraphData::File(t) = data {
let path = t.item;
let _guard = t.guard; // released at end of block
let mut count: isize = 0;
let push_clone = push.clone();
let result = flat_fn(path, &mut |batch: Vec<CanonicalKmer>| {
push_clone.send(Ok(KmerGraphData::RawBatch(batch))).ok();
count += 1;
});
match result {
Ok(()) => {
delta.send(count - 1).ok();
}
Err(e) => {
*err_flat.lock().unwrap() = Some(e);
delta.send(-1).ok();
}
}
}
},
) as SharedFlatFn<KmerGraphData>),
make_transform!(KmerGraphData, {
let filter = Arc::clone(&filter);
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
batch.into_iter().filter(|k| filter(*k)).collect()
}
}, RawBatch, FilteredBatch),
],
make_sink!(KmerGraphData, {
move |batch: Vec<CanonicalKmer>| {
let mut g = g_sink.lock().unwrap();
for kmer in batch {
g.push(kmer);
}
}
}, FilteredBatch),
);
WorkerPool::new(pipeline, n_workers, capacity).run();
if let Some(e) = Arc::try_unwrap(err_cap)
.unwrap_or_else(|_| panic!("build_graph: err_cap not uniquely owned after pipeline"))
.into_inner()
.unwrap_or_else(|e| e.into_inner())
{
return Err(e);
}
let g = Arc::try_unwrap(g_shared)
.unwrap_or_else(|_| panic!("build_graph: g_shared not uniquely owned after pipeline"))
.into_inner()
.unwrap_or_else(|e| e.into_inner());
Ok(g)
}
// ── write_graph_as_unitigs ────────────────────────────────────────────────────
/// Phase 2 (write unitigs only): compute degrees, write unitigs to `layer_dir`, drop graph.
///
/// Returns n_kmers. Does NOT build the MPHF — caller does it.
pub(crate) fn write_graph_as_unitigs(g: GraphDeBruijn, layer_dir: &Path) -> SKResult<usize> {
let n_kmers = g.len();
g.compute_degrees_and_mark_starts();
std::fs::create_dir_all(layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(layer_dir).map_err(|e| olm_to_sk(e, "graph pipeline"))?;
g.try_for_each_unitig(|unitig| uw.write(unitig))?;
uw.close()?;
drop(g);
Ok(n_kmers)
}
// ── materialize_layer ─────────────────────────────────────────────────────────
/// Phase 2 (full): write_graph_as_unitigs + `Layer::<()>::build`.
///
/// Returns n_kmers.
pub(crate) fn materialize_layer(
g: GraphDeBruijn,
layer_dir: &Path,
block_bits: u8,
evidence: &IndexMode,
) -> SKResult<usize> {
let n = write_graph_as_unitigs(g, layer_dir)?;
debug!("materialize_layer: unitigs written ({n} kmers), building MPHF");
Layer::<()>::build(layer_dir, block_bits, evidence)
.map_err(|e| olm_to_sk(e, "graph pipeline"))?;
debug!("materialize_layer: MPHF build done");
Ok(n)
}
+10 -27
View File
@@ -6,24 +6,16 @@ use epserde::prelude::*;
use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec};
use obidebruinj::GraphDeBruijn; use obidebruinj::GraphDeBruijn;
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, OLMError, layer::Layer}; use obilayeredmap::{IndexMode, layer::Layer};
use obiskio::{SKError, SKFileMeta, SKFileReader}; use obiskio::{SKError, SKFileMeta, SKFileReader};
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
use crate::common::olm_to_sk;
use crate::graph_pipeline::{materialize_layer, write_graph_as_unitigs};
use crate::partition::KmerPartition; use crate::partition::KmerPartition;
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>; type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
fn olm_to_sk(e: OLMError) -> SKError {
match e {
OLMError::Io(io_err) => SKError::Io(io_err),
other => SKError::InvalidData {
context: "layer build",
detail: other.to_string(),
},
}
}
fn remove_if_exists(path: &std::path::Path) { fn remove_if_exists(path: &std::path::Path) {
if let Err(e) = fs::remove_file(path) { if let Err(e) = fs::remove_file(path) {
if e.kind() != io::ErrorKind::NotFound { if e.kind() != io::ErrorKind::NotFound {
@@ -101,18 +93,8 @@ impl KmerPartition {
} }
} }
let n_kmers = g.len(); let n_kmers = if with_counts {
g.compute_degrees_and_mark_starts(); let n = write_graph_as_unitigs(g, &layer_dir)?;
fs::create_dir_all(&layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?;
g.try_for_each_unitig(|unitig| {
uw.write(unitig)
})?;
uw.close()?;
if with_counts {
Layer::<PersistentCompactIntMatrix>::build( Layer::<PersistentCompactIntMatrix>::build(
&layer_dir, &layer_dir,
block_bits, block_bits,
@@ -122,10 +104,11 @@ impl KmerPartition {
_ => 1, _ => 1,
}, },
) )
.map_err(olm_to_sk)?; .map_err(|e| olm_to_sk(e, "layer build"))?;
n
} else { } else {
Layer::<()>::build(&layer_dir, block_bits, mode).map_err(olm_to_sk)?; materialize_layer(g, &layer_dir, block_bits, mode)?
} };
let index_dir = layer_dir.parent().expect("layer_dir has a parent"); let index_dir = layer_dir.parent().expect("layer_dir has a parent");
PartitionMeta { PartitionMeta {
@@ -133,7 +116,7 @@ impl KmerPartition {
mode: mode.clone(), mode: mode.clone(),
} }
.save(index_dir) .save(index_dir)
.map_err(olm_to_sk)?; .map_err(|e| olm_to_sk(e, "layer build"))?;
Ok(n_kmers) Ok(n_kmers)
} }
+2
View File
@@ -1,6 +1,8 @@
pub mod filter; pub mod filter;
mod common;
mod distance; mod distance;
mod dump_layer; mod dump_layer;
mod graph_pipeline;
mod index_layer; mod index_layer;
mod kmer_sort; mod kmer_sort;
mod merge_layer; mod merge_layer;
+36 -177
View File
@@ -1,5 +1,4 @@
use std::fs; use std::fs;
use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -14,12 +13,13 @@ use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder, PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder,
}; };
use obidebruinj::GraphDeBruijn;
use obikseq::CanonicalKmer; use obikseq::CanonicalKmer;
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError}; use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly};
use obiskio::{SKError, SKResult, UnitigFileReader}; use obiskio::{SKError, SKResult, UnitigFileReader};
use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta};
use crate::graph_pipeline::{build_graph, materialize_layer};
use crate::partition::KmerPartition; use crate::partition::KmerPartition;
// ── MergeMode ───────────────────────────────────────────────────────────────── // ── MergeMode ─────────────────────────────────────────────────────────────────
@@ -30,29 +30,6 @@ pub enum MergeMode {
Count, Count,
} }
// ── ColBuilder — enum dispatch to avoid trait-object boxing issues ─────────────
enum ColBuilder {
Bit(PersistentBitVecBuilder),
Int(PersistentCompactIntVecBuilder),
}
impl ColBuilder {
fn set_val(&mut self, slot: usize, value: u32) {
match self {
ColBuilder::Bit(b) => b.set(slot, value > 0),
ColBuilder::Int(b) => b.set(slot, value),
}
}
fn close(self) -> SKResult<()> {
match self {
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
}
}
}
// ── SrcLayerData — opened source matrix for pass-2 lookup ───────────────────── // ── SrcLayerData — opened source matrix for pass-2 lookup ─────────────────────
pub(crate) enum SrcLayerData { pub(crate) enum SrcLayerData {
@@ -66,18 +43,18 @@ impl SrcLayerData {
match merge_mode { match merge_mode {
MergeMode::Presence => { MergeMode::Presence => {
if counts_dir.exists() && !layer_dir.join("presence").exists() { if counts_dir.exists() && !layer_dir.join("presence").exists() {
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?;
let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Count(mphf, mat)) Ok(SrcLayerData::Count(mphf, mat))
} else { } else {
// presence dir exists, or neither exists → Implicit handled by open() // presence dir exists, or neither exists → Implicit handled by open()
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?;
let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Presence(mphf, mat)) Ok(SrcLayerData::Presence(mphf, mat))
} }
} }
MergeMode::Count => { MergeMode::Count => {
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?;
if counts_dir.exists() { if counts_dir.exists() {
let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Count(mphf, mat)) Ok(SrcLayerData::Count(mphf, mat))
@@ -107,53 +84,6 @@ impl SrcLayerData {
const INDEX_SUBDIR: &str = "index"; const INDEX_SUBDIR: &str = "index";
/// Load PartitionMeta, or recover it by probing layer directories.
/// Indexes built before meta.json was introduced lack the file.
fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
match PartitionMeta::load(dir) {
Ok(m) => Ok(m),
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
{
let mut n = 0usize;
while dir.join(format!("layer_{n}")).exists() {
n += 1;
}
let m = PartitionMeta {
n_layers: n,
mode: IndexMode::default(),
};
m.save(dir).map_err(olm_to_sk)?;
Ok(m)
}
Err(e) => Err(olm_to_sk(e)),
}
}
fn olm_to_sk(e: OLMError) -> SKError {
match e {
OLMError::Io(e) => SKError::Io(e),
other => SKError::InvalidData {
context: "merge",
detail: other.to_string(),
},
}
}
fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pbiv"))
}
fn col_path_int(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pciv"))
}
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
fs::write(
dir.join("meta.json"),
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
)
}
// ── KmerPartition::merge_partition ──────────────────────────────────────────── // ── KmerPartition::merge_partition ────────────────────────────────────────────
impl KmerPartition { impl KmerPartition {
@@ -180,8 +110,8 @@ impl KmerPartition {
return Ok(0); return Ok(0);
} }
load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open load_meta(&dst_index_dir, "merge")?; // ensure meta.json exists before LayeredMap::open
let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?); let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?);
let n_dst_layers = dst_map.n_layers(); let n_dst_layers = dst_map.n_layers();
let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum(); let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum();
@@ -191,7 +121,7 @@ impl KmerPartition {
for l in 0..n_dst_layers { for l in 0..n_dst_layers {
let layer_dir = dst_index_dir.join(format!("layer_{l}")); let layer_dir = dst_index_dir.join(format!("layer_{l}"));
Layer::<()>::init_presence_matrix(&layer_dir, dst_map.layer(l).n()) Layer::<()>::init_presence_matrix(&layer_dir, dst_map.layer(l).n())
.map_err(olm_to_sk)?; .map_err(|e| olm_to_sk(e, "merge"))?;
} }
} }
@@ -209,7 +139,7 @@ impl KmerPartition {
if !src_index_dir.exists() { if !src_index_dir.exists() {
continue; continue;
} }
let src_meta = load_meta(&src_index_dir)?; let src_meta = load_meta(&src_index_dir, "merge")?;
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let p = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin"); let p = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin");
if p.exists() { if p.exists() {
@@ -221,92 +151,36 @@ impl KmerPartition {
let n_src_layers = unitig_paths.len(); let n_src_layers = unitig_paths.len();
debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)"); debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)");
enum Pass1Data {
File((PathBuf, ThrottleGuard)),
Batch(Vec<CanonicalKmer>),
NewKmers(Vec<CanonicalKmer>),
}
const BATCH: usize = 4096; const BATCH: usize = 4096;
let n_workers = rayon::current_num_threads().min(16).max(4); let n_workers = rayon::current_num_threads().min(16).max(4);
let capacity = 2;
// At most 2 files open simultaneously: keeps n_workers-2 workers free // At most 2 files open simultaneously: keeps n_workers-2 workers free
// for the Transform stage. Each open file monopolises one worker for the // for the Transform stage. Each open file monopolises one worker for the
// full duration of its read, so this must stay well below n_workers. // full duration of its read, so this must stay well below n_workers.
let max_open = 2; let max_open = 2;
let dst_filter = Arc::clone(&dst_map); let dst_filter = Arc::clone(&dst_map);
let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new()));
let g_sink = Arc::clone(&g_shared);
let pass1_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let err_cap = Arc::clone(&pass1_err);
let throttled_paths = throttle(unitig_paths.into_iter(), max_open); let g = build_graph(
unitig_paths.into_iter(),
let pipeline = Pipeline::new( move |path: PathBuf, emit: &mut dyn FnMut(Vec<CanonicalKmer>)| -> SKResult<()> {
make_source!(Pass1Data, throttled_paths.map(|t| (t.item, t.guard)), File), let reader = UnitigFileReader::open_sequential(&path)?;
vec![ let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
Stage::Flat(Arc::new( for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
move |data: Pass1Data, batch.push(kmer);
push: &PipelineSender<Result<Pass1Data, PipelineError>>, if batch.len() == BATCH {
delta: &PipelineSender<isize>| emit(std::mem::replace(&mut batch, Vec::with_capacity(BATCH)));
{
if let Pass1Data::File((path, _guard)) = data {
// _guard is dropped at end of this block, releasing the slot.
let reader = match UnitigFileReader::open_sequential(&path) {
Ok(r) => r,
Err(e) => {
*err_cap.lock().unwrap() = Some(e.to_string());
delta.send(-1).ok();
return;
}
};
let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
let mut count: isize = 0;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
batch.push(kmer);
if batch.len() == BATCH {
let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH));
push.send(Ok(Pass1Data::Batch(b))).ok();
count += 1;
}
}
if !batch.is_empty() {
push.send(Ok(Pass1Data::Batch(batch))).ok();
count += 1;
}
delta.send(count - 1).ok();
}
}
) as SharedFlatFn<Pass1Data>),
make_transform!(Pass1Data, {
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
batch.into_iter()
.filter(|&k| dst_filter.query(k).is_none())
.collect()
}
}, Batch, NewKmers),
],
make_sink!(Pass1Data, {
move |batch: Vec<CanonicalKmer>| {
let mut g = g_sink.lock().unwrap();
for kmer in batch {
g.push(kmer);
} }
} }
}, NewKmers), if !batch.is_empty() {
); emit(batch);
}
Ok(())
},
move |kmer| dst_filter.query(kmer).is_none(),
n_workers,
max_open,
)?;
WorkerPool::new(pipeline, n_workers, capacity).run();
if let Some(msg) = Arc::try_unwrap(pass1_err).unwrap().into_inner().unwrap() {
return Err(SKError::InvalidData { context: "merge pass1", detail: msg });
}
let g = Arc::try_unwrap(g_shared)
.unwrap_or_else(|_| panic!("pass1: g_shared not uniquely owned after pipeline"))
.into_inner()
.unwrap_or_else(|e| e.into_inner());
let any_new = g.len() > 0; let any_new = g.len() > 0;
debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len()); debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len());
@@ -315,25 +189,8 @@ impl KmerPartition {
let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}"));
let n_new = if any_new { let n_new = if any_new {
let t_deg = std::time::Instant::now();
g.compute_degrees_and_mark_starts();
debug!("partition {i}: compute_degrees in {:.3}s — {} nodes",
t_deg.elapsed().as_secs_f64(), g.len());
fs::create_dir_all(&new_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?;
debug!("partition {i}: unitig traversal start — {} nodes", g.len()); debug!("partition {i}: unitig traversal start — {} nodes", g.len());
let mut n_unitigs = 0usize; let n_nodes = materialize_layer(g, &new_layer_dir, block_bits, evidence)?;
g.try_for_each_unitig(|unitig| {
n_unitigs += 1;
uw.write(unitig)
})?;
debug!("partition {i}: unitig writer closing");
uw.close()?;
let n_nodes = g.len();
debug!("partition {i}: unitig writer closed — dropping graph ({n_nodes} nodes)");
drop(g);
debug!("partition {i}: graph dropped — starting MPHF build ({n_unitigs} unitigs)");
Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
debug!("partition {i}: MPHF build done"); debug!("partition {i}: MPHF build done");
n_nodes n_nodes
} else { } else {
@@ -343,7 +200,7 @@ impl KmerPartition {
let t_open = std::time::Instant::now(); let t_open = std::time::Instant::now();
let new_mphf: Option<Arc<MphfOnly>> = if any_new { let new_mphf: Option<Arc<MphfOnly>> = if any_new {
Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?)) Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(|e| olm_to_sk(e, "merge"))?))
} else { } else {
None None
}; };
@@ -449,7 +306,7 @@ impl KmerPartition {
col_offset += src_n; col_offset += src_n;
continue; continue;
} }
let src_meta = load_meta(&src_index_dir)?; let src_meta = load_meta(&src_index_dir, "merge")?;
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
if src_layer_dir.join("unitigs.bin").exists() { if src_layer_dir.join("unitigs.bin").exists() {
@@ -483,6 +340,7 @@ impl KmerPartition {
let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None)); let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let err_cap2 = Arc::clone(&pass2_err); let err_cap2 = Arc::clone(&pass2_err);
let capacity = 2;
let throttled_pass2 = throttle(pass2_items.into_iter(), max_open); let throttled_pass2 = throttle(pass2_items.into_iter(), max_open);
let pipeline2 = Pipeline::new( let pipeline2 = Pipeline::new(
@@ -516,6 +374,7 @@ impl KmerPartition {
return; return;
} }
}; };
const BATCH: usize = 4096;
let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH); let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
let mut count: isize = 0; let mut count: isize = 0;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
@@ -617,9 +476,9 @@ impl KmerPartition {
write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total) write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total)
.map_err(SKError::Io)?; .map_err(SKError::Io)?;
let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?; let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?;
part_meta.n_layers = new_layer_idx + 1; part_meta.n_layers = new_layer_idx + 1;
part_meta.save(&dst_index_dir).map_err(olm_to_sk)?; part_meta.save(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?;
} }
debug!("partition {i}: builders closed in {:.3}s", t_close.elapsed().as_secs_f64()); debug!("partition {i}: builders closed in {:.3}s", t_close.elapsed().as_secs_f64());
+11 -91
View File
@@ -1,6 +1,4 @@
use std::fs; use std::path::Path;
use std::io;
use std::path::{Path, PathBuf};
use obicompactvec::{ use obicompactvec::{
PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder,
@@ -9,85 +7,17 @@ use obicompactvec::{
use obidebruinj::GraphDeBruijn; use obidebruinj::GraphDeBruijn;
use obikseq::CanonicalKmer; use obikseq::CanonicalKmer;
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError}; use obilayeredmap::{IndexMode, MphfLayer};
use obiskio::{SKError, SKResult, UnitigFileReader}; use obiskio::{SKError, SKResult, UnitigFileReader};
use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta};
use crate::filter::{KmerFilter, passes_all}; use crate::filter::{KmerFilter, passes_all};
use crate::graph_pipeline::materialize_layer;
use crate::merge_layer::{MergeMode, SrcLayerData}; use crate::merge_layer::{MergeMode, SrcLayerData};
use crate::partition::KmerPartition; use crate::partition::KmerPartition;
const INDEX_SUBDIR: &str = "index"; const INDEX_SUBDIR: &str = "index";
fn olm_to_sk(e: OLMError) -> SKError {
match e {
OLMError::Io(e) => SKError::Io(e),
other => SKError::InvalidData {
context: "rebuild",
detail: other.to_string(),
},
}
}
fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pbiv"))
}
fn col_path_int(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pciv"))
}
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
fs::write(
dir.join("meta.json"),
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
)
}
// ── ColBuilder ────────────────────────────────────────────────────────────────
enum ColBuilder {
Bit(PersistentBitVecBuilder),
Int(PersistentCompactIntVecBuilder),
}
impl ColBuilder {
fn set_val(&mut self, slot: usize, value: u32) {
match self {
ColBuilder::Bit(b) => b.set(slot, value > 0),
ColBuilder::Int(b) => b.set(slot, value),
}
}
fn close(self) -> SKResult<()> {
match self {
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
}
}
}
// ── Helpers ───────────────────────────────────────────────────────────────────
fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
match PartitionMeta::load(dir) {
Ok(m) => Ok(m),
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
{
let mut n = 0usize;
while dir.join(format!("layer_{n}")).exists() {
n += 1;
}
let m = PartitionMeta {
n_layers: n,
mode: IndexMode::default(),
};
m.save(dir).map_err(olm_to_sk)?;
Ok(m)
}
Err(e) => Err(olm_to_sk(e)),
}
}
/// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`. /// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`.
/// ///
/// Uses [`SrcLayerData`] semantics: counts take priority over presence when /// Uses [`SrcLayerData`] semantics: counts take priority over presence when
@@ -99,7 +29,7 @@ fn iter_src_layers(
filters: &[Box<dyn KmerFilter>], filters: &[Box<dyn KmerFilter>],
mut cb: impl FnMut(CanonicalKmer, Box<[u32]>), mut cb: impl FnMut(CanonicalKmer, Box<[u32]>),
) -> SKResult<()> { ) -> SKResult<()> {
let src_meta = load_meta(src_index_dir)?; let src_meta = load_meta(src_index_dir, "rebuild")?;
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
let unitigs_path = src_layer_dir.join("unitigs.bin"); let unitigs_path = src_layer_dir.join("unitigs.bin");
@@ -144,7 +74,7 @@ impl KmerPartition {
return Ok(()); return Ok(());
} }
let src_meta = load_meta(&src_index_dir)?; let src_meta = load_meta(&src_index_dir, "rebuild")?;
if src_meta.n_layers == 0 { if src_meta.n_layers == 0 {
return Ok(()); return Ok(());
} }
@@ -159,30 +89,20 @@ impl KmerPartition {
return Ok(()); return Ok(());
} }
let n_new = g.len();
g.compute_degrees_and_mark_starts();
// ── Build MPHF in dst layer_0 ───────────────────────────────────────── // ── Build MPHF in dst layer_0 ─────────────────────────────────────────
let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR);
let dst_layer_dir = dst_index_dir.join("layer_0"); let dst_layer_dir = dst_index_dir.join("layer_0");
fs::create_dir_all(&dst_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; let n_new = materialize_layer(g, &dst_layer_dir, block_bits, &IndexMode::Exact)?;
g.try_for_each_unitig(|unitig| { let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact)
uw.write(unitig) .map_err(|e| olm_to_sk(e, "rebuild"))?;
})?;
uw.close()?;
drop(g);
Layer::<()>::build(&dst_layer_dir, block_bits, &IndexMode::Exact).map_err(olm_to_sk)?;
let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact).map_err(olm_to_sk)?;
// ── Prepare matrix builders (one column per genome) ─────────────────── // ── Prepare matrix builders (one column per genome) ───────────────────
let data_dir = match mode { let data_dir = match mode {
MergeMode::Presence => dst_layer_dir.join("presence"), MergeMode::Presence => dst_layer_dir.join("presence"),
MergeMode::Count => dst_layer_dir.join("counts"), MergeMode::Count => dst_layer_dir.join("counts"),
}; };
fs::create_dir_all(&data_dir)?; std::fs::create_dir_all(&data_dir)?;
let mut builders: Vec<ColBuilder> = match mode { let mut builders: Vec<ColBuilder> = match mode {
MergeMode::Presence => { MergeMode::Presence => {
@@ -234,7 +154,7 @@ impl KmerPartition {
mode: IndexMode::Exact, mode: IndexMode::Exact,
} }
.save(&dst_index_dir) .save(&dst_index_dir)
.map_err(olm_to_sk)?; .map_err(|e| olm_to_sk(e, "rebuild"))?;
Ok(()) Ok(())
} }