Merge pull request 'Push pslsukyowzrp' (#32) from push-pslsukyowzrp into main
Reviewed-on: #32
This commit was merged in pull request #32.
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use std::fs::{self, File};
|
||||
use std::io::{self, Write as _};
|
||||
use std::io::{self, BufWriter, Write as _};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use memmap2::Mmap;
|
||||
@@ -230,30 +230,47 @@ impl PackedBitMatrix {
|
||||
|
||||
/// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files.
|
||||
pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> {
|
||||
let packed_path = dir.join("matrix.pbmx");
|
||||
if packed_path.exists() {
|
||||
// Matrix complete; remove any leftover column files from a killed cleanup.
|
||||
if let Ok(meta) = MatrixMeta::load(dir) {
|
||||
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
|
||||
let _ = fs::remove_file(dir.join("meta.json"));
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let meta = MatrixMeta::load(dir)?;
|
||||
let n_cols = meta.n_cols;
|
||||
|
||||
let col_files: Vec<Vec<u8>> = (0..n_cols)
|
||||
.map(|c| fs::read(col_path(dir, c)))
|
||||
// Compute offsets from file sizes — no column data loaded into RAM.
|
||||
let col_sizes: Vec<u64> = (0..n_cols)
|
||||
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
|
||||
.collect::<io::Result<_>>()?;
|
||||
|
||||
let header_size = PBMX_HEADER + n_cols * 8;
|
||||
let header_size = (PBMX_HEADER + n_cols * 8) as u64;
|
||||
let mut col_offset = header_size;
|
||||
let mut offsets = Vec::with_capacity(n_cols);
|
||||
for data in &col_files {
|
||||
offsets.push(col_offset as u64);
|
||||
col_offset += data.len();
|
||||
for &size in &col_sizes {
|
||||
offsets.push(col_offset);
|
||||
col_offset += size;
|
||||
}
|
||||
|
||||
let packed_path = dir.join("matrix.pbmx");
|
||||
let mut file = File::create(&packed_path)?;
|
||||
file.write_all(&PBMX_MAGIC)?;
|
||||
file.write_all(&[0u8; 4])?;
|
||||
file.write_all(&(meta.n as u64).to_le_bytes())?;
|
||||
file.write_all(&(n_cols as u64).to_le_bytes())?;
|
||||
for &off in &offsets { file.write_all(&off.to_le_bytes())?; }
|
||||
for data in &col_files { file.write_all(data)?; }
|
||||
drop(file);
|
||||
// Write to a temp file; rename atomically so a killed process never leaves
|
||||
// a truncated matrix.pbmx that would be mistaken for a complete file.
|
||||
let tmp_path = dir.join("matrix.pbmx.tmp");
|
||||
let mut out = BufWriter::new(File::create(&tmp_path)?);
|
||||
out.write_all(&PBMX_MAGIC)?;
|
||||
out.write_all(&[0u8; 4])?;
|
||||
out.write_all(&(meta.n as u64).to_le_bytes())?;
|
||||
out.write_all(&(n_cols as u64).to_le_bytes())?;
|
||||
for &off in &offsets { out.write_all(&off.to_le_bytes())?; }
|
||||
for c in 0..n_cols {
|
||||
io::copy(&mut File::open(col_path(dir, c))?, &mut out)?;
|
||||
}
|
||||
out.flush()?;
|
||||
drop(out);
|
||||
fs::rename(&tmp_path, &packed_path)?;
|
||||
|
||||
for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; }
|
||||
fs::remove_file(dir.join("meta.json"))?;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{self, Write as _};
|
||||
use std::io::{self, BufWriter, Write as _};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use memmap2::Mmap;
|
||||
@@ -354,30 +354,47 @@ impl PackedCompactIntMatrix {
|
||||
|
||||
/// Build `counts/matrix.pcmx` from existing `col_*.pciv` files.
|
||||
pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> {
|
||||
let packed_path = dir.join("matrix.pcmx");
|
||||
if packed_path.exists() {
|
||||
// Matrix complete; remove any leftover column files from a killed cleanup.
|
||||
if let Ok(meta) = MatrixMeta::load(dir) {
|
||||
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
|
||||
let _ = fs::remove_file(dir.join("meta.json"));
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let meta = MatrixMeta::load(dir)?;
|
||||
let n_cols = meta.n_cols;
|
||||
|
||||
let col_files: Vec<Vec<u8>> = (0..n_cols)
|
||||
.map(|c| fs::read(col_path(dir, c)))
|
||||
// Compute offsets from file sizes — no column data loaded into RAM.
|
||||
let col_sizes: Vec<u64> = (0..n_cols)
|
||||
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
|
||||
.collect::<io::Result<_>>()?;
|
||||
|
||||
let header_size = PCMX_HEADER + n_cols * 8;
|
||||
let header_size = (PCMX_HEADER + n_cols * 8) as u64;
|
||||
let mut col_offset = header_size;
|
||||
let mut offsets = Vec::with_capacity(n_cols);
|
||||
for data in &col_files {
|
||||
offsets.push(col_offset as u64);
|
||||
col_offset += data.len();
|
||||
for &size in &col_sizes {
|
||||
offsets.push(col_offset);
|
||||
col_offset += size;
|
||||
}
|
||||
|
||||
let packed_path = dir.join("matrix.pcmx");
|
||||
let mut file = File::create(&packed_path)?;
|
||||
file.write_all(&PCMX_MAGIC)?;
|
||||
file.write_all(&[0u8; 4])?;
|
||||
file.write_all(&(meta.n as u64).to_le_bytes())?;
|
||||
file.write_all(&(n_cols as u64).to_le_bytes())?;
|
||||
for &off in &offsets { file.write_all(&off.to_le_bytes())?; }
|
||||
for data in &col_files { file.write_all(data)?; }
|
||||
drop(file);
|
||||
// Write to a temp file; rename atomically so a killed process never leaves
|
||||
// a truncated matrix.pcmx that would be mistaken for a complete file.
|
||||
let tmp_path = dir.join("matrix.pcmx.tmp");
|
||||
let mut out = BufWriter::new(File::create(&tmp_path)?);
|
||||
out.write_all(&PCMX_MAGIC)?;
|
||||
out.write_all(&[0u8; 4])?;
|
||||
out.write_all(&(meta.n as u64).to_le_bytes())?;
|
||||
out.write_all(&(n_cols as u64).to_le_bytes())?;
|
||||
for &off in &offsets { out.write_all(&off.to_le_bytes())?; }
|
||||
for c in 0..n_cols {
|
||||
io::copy(&mut File::open(col_path(dir, c))?, &mut out)?;
|
||||
}
|
||||
out.flush()?;
|
||||
drop(out);
|
||||
fs::rename(&tmp_path, &packed_path)?;
|
||||
|
||||
for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; }
|
||||
fs::remove_file(dir.join("meta.json"))?;
|
||||
|
||||
+26
-45
@@ -1,8 +1,6 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use obikpartitionner::{KmerPartition, KmerSpectrum};
|
||||
use obilayeredmap;
|
||||
@@ -152,31 +150,25 @@ impl KmerIndex {
|
||||
let with_counts = self.meta.config.with_counts;
|
||||
let evidence = self.meta.config.evidence.clone();
|
||||
let block_bits = self.meta.config.block_bits;
|
||||
let total_kmers = AtomicUsize::new(0);
|
||||
let mut total_kmers: usize = 0;
|
||||
let pb = progress_bar("index", n as u64, "partitions");
|
||||
|
||||
let pb = Arc::new(Mutex::new(progress_bar("index", n as u64, "partitions")));
|
||||
|
||||
(0..n).into_par_iter().for_each(|i| {
|
||||
match self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits) {
|
||||
Ok(0) => {}
|
||||
Ok(n_kmers) => {
|
||||
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
|
||||
let pb = pb.lock().unwrap();
|
||||
let order: Vec<usize> = (0..n).collect();
|
||||
let runner = crate::numa::PartitionRunner::new();
|
||||
runner.run(
|
||||
&order,
|
||||
|i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits),
|
||||
|i, n_kmers, _| {
|
||||
if n_kmers > 0 {
|
||||
total_kmers += n_kmers;
|
||||
pb.inc(1);
|
||||
pb.set_message(format!("{i}: {n_kmers} kmers"));
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("error building layer for partition {i}: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
});
|
||||
},
|
||||
).map_err(OKIError::Partition)?;
|
||||
|
||||
pb.lock().unwrap().finish_and_clear();
|
||||
info!(
|
||||
"done — {} total kmers indexed",
|
||||
total_kmers.load(Ordering::Relaxed)
|
||||
);
|
||||
pb.finish_and_clear();
|
||||
info!("done — {} total kmers indexed", total_kmers);
|
||||
|
||||
if !keep_intermediate {
|
||||
for i in 0..n {
|
||||
@@ -211,36 +203,25 @@ impl KmerIndex {
|
||||
use obilayeredmap::meta::PartitionMeta;
|
||||
|
||||
let n = self.n_partitions();
|
||||
let errors: Vec<_> = (0..n)
|
||||
.into_par_iter()
|
||||
.filter_map(|i| {
|
||||
let order: Vec<usize> = (0..n).collect();
|
||||
crate::numa::PartitionRunner::new().run(
|
||||
&order,
|
||||
|i| -> OKIResult<()> {
|
||||
let index_dir = self.partition.part_dir(i).join("index");
|
||||
if !index_dir.exists() { return None; }
|
||||
let meta = match PartitionMeta::load(&index_dir) {
|
||||
Ok(m) => m,
|
||||
Err(e) => return Some(OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
|
||||
};
|
||||
if !index_dir.exists() { return Ok(()); }
|
||||
let meta = PartitionMeta::load(&index_dir)
|
||||
.map_err(|e| OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())))?;
|
||||
for l in 0..meta.n_layers {
|
||||
let layer_dir = index_dir.join(format!("layer_{l}"));
|
||||
let presence_dir = layer_dir.join("presence");
|
||||
let counts_dir = layer_dir.join("counts");
|
||||
if presence_dir.exists() {
|
||||
if let Err(e) = pack_bit_matrix(&presence_dir) {
|
||||
return Some(OKIError::Io(e));
|
||||
if presence_dir.exists() { pack_bit_matrix(&presence_dir).map_err(OKIError::Io)?; }
|
||||
if counts_dir.exists() { pack_compact_int_matrix(&counts_dir).map_err(OKIError::Io)?; }
|
||||
}
|
||||
}
|
||||
if counts_dir.exists() {
|
||||
if let Err(e) = pack_compact_int_matrix(&counts_dir) {
|
||||
return Some(OKIError::Io(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.collect();
|
||||
|
||||
if let Some(e) = errors.into_iter().next() { return Err(e); }
|
||||
Ok(())
|
||||
},
|
||||
|_, _, _| {},
|
||||
)
|
||||
}
|
||||
|
||||
/// Write a `layer_meta.json` in any layer directory that is missing one.
|
||||
|
||||
+52
-156
@@ -1,4 +1,4 @@
|
||||
// NUMA-aware Rayon thread pools via hwlocality.
|
||||
// NUMA-aware partition runner via hwlocality.
|
||||
//
|
||||
// Detects NUMA topology using hwloc (cross-platform: Linux, macOS, etc.) and
|
||||
// builds one Rayon ThreadPool per NUMA node with threads pinned to that node's
|
||||
@@ -10,15 +10,14 @@
|
||||
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
|
||||
// - any per-node pool fails to build
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crossbeam_channel::{RecvTimeoutError, unbounded};
|
||||
use crossbeam_channel::unbounded;
|
||||
use hwlocality::Topology;
|
||||
use hwlocality::cpu::binding::CpuBindingFlags;
|
||||
use hwlocality::cpu::cpuset::CpuSet;
|
||||
use hwlocality::object::types::ObjectType;
|
||||
use obisys::CpuSample;
|
||||
use tracing::debug;
|
||||
|
||||
// ── Public interface ──────────────────────────────────────────────────────────
|
||||
@@ -104,27 +103,6 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
|
||||
.ok()
|
||||
}
|
||||
|
||||
// ── Adaptive spawn heuristic ──────────────────────────────────────────────────
|
||||
//
|
||||
// First worker: spawn if CPU efficiency is below SPAWN_THRESHOLD (machine is
|
||||
// under-utilised). Subsequent workers: spawn only if the last worker raised
|
||||
// efficiency by at least the expected marginal gain (1/n_workers), with a
|
||||
// minimum floor to avoid spurious spawns from efficiency fluctuations.
|
||||
|
||||
const SPAWN_THRESHOLD: f64 = 0.95;
|
||||
const MIN_MARGINAL_GAIN: f64 = 0.03;
|
||||
const SPAWN_POLL: Duration = Duration::from_secs(20);
|
||||
|
||||
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
|
||||
if n_workers == 1 {
|
||||
eff < SPAWN_THRESHOLD
|
||||
} else {
|
||||
let gain = eff - eff_at_last_spawn;
|
||||
let expected = 1.0 / n_workers as f64;
|
||||
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
|
||||
}
|
||||
}
|
||||
|
||||
// ── PartitionRunner ───────────────────────────────────────────────────────────
|
||||
|
||||
struct NodeConfig {
|
||||
@@ -135,42 +113,38 @@ struct NodeConfig {
|
||||
|
||||
/// Generic NUMA-aware runner for partition-level parallel work.
|
||||
///
|
||||
/// Encapsulates worker spawning, NUMA pinning, adaptive activation, and result
|
||||
/// collection. UMA systems are handled as the degenerate case of a single node
|
||||
/// with no pinning.
|
||||
/// Workers are distributed round-robin across NUMA nodes and pinned to their
|
||||
/// node's CPUs. UMA systems are the degenerate case: one node, no pinning.
|
||||
///
|
||||
/// # Model
|
||||
/// # Termination
|
||||
///
|
||||
/// One controller thread per NUMA node (one total on UMA). Each controller
|
||||
/// manages up to `max_workers` dormant workers that drain a shared work queue.
|
||||
/// Workers are activated one at a time; a new worker is added when global CPU
|
||||
/// efficiency justifies it. On NUMA all workers are activated immediately
|
||||
/// (memory bandwidth, not CPU count, is the bottleneck).
|
||||
/// Termination is driven entirely by channel closure:
|
||||
///
|
||||
/// ```text
|
||||
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
|
||||
/// drop(result_tx) → result_rx closes → controller loop exits
|
||||
/// ```
|
||||
///
|
||||
/// No explicit counter or sentinel needed.
|
||||
pub struct PartitionRunner {
|
||||
nodes: Vec<NodeConfig>,
|
||||
n_cores: usize,
|
||||
}
|
||||
|
||||
impl PartitionRunner {
|
||||
/// Detect topology and build. Falls back to a single-node UMA runner on
|
||||
/// macOS, single-socket machines, or hwloc failure.
|
||||
/// Total number of pre-spawned worker slots across all nodes.
|
||||
/// Total worker slots across all nodes.
|
||||
pub fn max_workers(&self) -> usize {
|
||||
self.nodes.iter().map(|n| n.max_workers).sum()
|
||||
}
|
||||
|
||||
/// Detect topology and build. Falls back to a single-node UMA runner on
|
||||
/// macOS, single-socket machines, or hwloc failure.
|
||||
pub fn new() -> Self {
|
||||
let n_cores = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
|
||||
match build() {
|
||||
Some(ns) => {
|
||||
let wpn = ns.workers_per_node();
|
||||
debug!(
|
||||
"PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node",
|
||||
ns.pools.len(),
|
||||
wpn,
|
||||
ns.pools.len(), wpn,
|
||||
);
|
||||
let nodes = ns.pools
|
||||
.into_iter()
|
||||
@@ -181,21 +155,20 @@ impl PartitionRunner {
|
||||
max_workers: wpn,
|
||||
})
|
||||
.collect();
|
||||
Self { nodes, n_cores }
|
||||
Self { nodes }
|
||||
}
|
||||
None => {
|
||||
let n_cores = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
let max_workers = (n_cores / 2).max(1);
|
||||
debug!(
|
||||
"PartitionRunner: UMA mode — adaptive up to {} worker(s)",
|
||||
max_workers,
|
||||
);
|
||||
debug!("PartitionRunner: UMA mode — {} worker(s)", max_workers);
|
||||
Self {
|
||||
nodes: vec![NodeConfig {
|
||||
pool: None,
|
||||
cpu_ids: vec![],
|
||||
max_workers,
|
||||
}],
|
||||
n_cores,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -203,19 +176,17 @@ impl PartitionRunner {
|
||||
|
||||
/// Run `f(i)` for every index in `order`.
|
||||
///
|
||||
/// `on_done(i, result, elapsed)` is called under an internal mutex as each
|
||||
/// partition completes — suitable for progress bars, logging, and result
|
||||
/// aggregation. No `Send` or `Sync` bound is required on the callback.
|
||||
///
|
||||
/// The work queue is shared across all NUMA nodes: any idle worker takes
|
||||
/// the next available partition regardless of node, ensuring load balance.
|
||||
/// Workers are spawned upfront and distributed round-robin across NUMA
|
||||
/// nodes. `on_done(i, result, elapsed)` is called from the controller
|
||||
/// thread as each partition completes — suitable for progress bars and
|
||||
/// result aggregation.
|
||||
///
|
||||
/// Returns the first error produced by `f`, if any.
|
||||
pub fn run<F, R, E, C>(
|
||||
&self,
|
||||
order: &[usize],
|
||||
f: F,
|
||||
on_done: C,
|
||||
mut on_done: C,
|
||||
) -> Result<(), E>
|
||||
where
|
||||
F: Fn(usize) -> Result<R, E> + Send + Sync,
|
||||
@@ -223,49 +194,29 @@ impl PartitionRunner {
|
||||
E: Send,
|
||||
C: FnMut(usize, R, Duration) + Send,
|
||||
{
|
||||
let f = Arc::new(f);
|
||||
let on_done = Arc::new(Mutex::new(on_done));
|
||||
let first_err: Arc<Mutex<Option<E>>> = Arc::new(Mutex::new(None));
|
||||
|
||||
// Shared work queue — pre-loaded in caller-supplied order.
|
||||
// Pre-load the work queue, then drop the sender so workers' part_rx
|
||||
// iterators terminate when the queue is drained.
|
||||
let (part_tx, part_rx) = unbounded::<usize>();
|
||||
for &i in order {
|
||||
part_tx.send(i).ok();
|
||||
}
|
||||
for &i in order { part_tx.send(i).ok(); }
|
||||
drop(part_tx);
|
||||
|
||||
let n_cores = self.n_cores;
|
||||
let (result_tx, result_rx) = unbounded::<(usize, Result<R, E>, Duration)>();
|
||||
let n_nodes = self.nodes.len();
|
||||
let f = &f; // shared borrow; F: Sync so concurrent calls are safe
|
||||
|
||||
let mut first_err: Option<E> = None;
|
||||
|
||||
std::thread::scope(|s| {
|
||||
for node in &self.nodes {
|
||||
let f = Arc::clone(&f);
|
||||
let on_done = Arc::clone(&on_done);
|
||||
let first_err = Arc::clone(&first_err);
|
||||
let part_rx = part_rx.clone();
|
||||
|
||||
s.spawn(move || {
|
||||
// Per-node result and activation channels.
|
||||
let (result_tx, result_rx) =
|
||||
unbounded::<(usize, Result<R, E>, Duration)>();
|
||||
let (activate_tx, activate_rx) = unbounded::<()>();
|
||||
|
||||
std::thread::scope(|ws| {
|
||||
// Pre-spawn workers (all dormant until activated).
|
||||
for _ in 0..node.max_workers {
|
||||
// Spawn all workers upfront, round-robin across NUMA nodes.
|
||||
for w in 0..self.max_workers() {
|
||||
let node = &self.nodes[w % n_nodes];
|
||||
let prx = part_rx.clone();
|
||||
let rtx = result_tx.clone();
|
||||
let arx = activate_rx.clone();
|
||||
let f = Arc::clone(&f);
|
||||
let pool = node.pool.clone();
|
||||
let cpu_ids = node.cpu_ids.clone();
|
||||
let cpu_ids = &node.cpu_ids;
|
||||
|
||||
ws.spawn(move || {
|
||||
if !cpu_ids.is_empty() {
|
||||
pin_current_thread(&cpu_ids);
|
||||
}
|
||||
if arx.recv().is_err() {
|
||||
return; // never activated — exit cleanly
|
||||
}
|
||||
s.spawn(move || {
|
||||
if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
|
||||
for i in &prx {
|
||||
let t = Instant::now();
|
||||
let r = match &pool {
|
||||
@@ -276,78 +227,23 @@ impl PartitionRunner {
|
||||
}
|
||||
});
|
||||
}
|
||||
// Drop the controller's copy: result_rx disconnects
|
||||
// once all worker copies are also dropped (workers done).
|
||||
|
||||
// Drop the controller's sender: result_rx closes once all worker
|
||||
// rtx clones are dropped (i.e. all workers have exited).
|
||||
drop(result_tx);
|
||||
|
||||
// In NUMA mode activate all workers immediately;
|
||||
// in UMA mode activate one and grow adaptively.
|
||||
let numa_mode = node.pool.is_some();
|
||||
let initial = if numa_mode { node.max_workers } else { 1 };
|
||||
for _ in 0..initial {
|
||||
activate_tx.send(()).ok();
|
||||
}
|
||||
let mut active_workers = initial;
|
||||
let mut cpu_sample = CpuSample::now();
|
||||
let mut eff_at_last_spawn = 0.0f64;
|
||||
|
||||
// Controller loop.
|
||||
loop {
|
||||
match result_rx.recv_timeout(SPAWN_POLL) {
|
||||
Ok((i, r, dur)) => {
|
||||
// Drain results concurrently with workers. The for loop exits
|
||||
// when result_rx is disconnected — at that point all workers are
|
||||
// done and the scope join below is instantaneous.
|
||||
for (i, r, dur) in &result_rx {
|
||||
match r {
|
||||
Ok(v) => {
|
||||
on_done.lock().unwrap()(i, v, dur);
|
||||
}
|
||||
Err(e) => {
|
||||
let mut g = first_err.lock().unwrap();
|
||||
if g.is_none() { *g = Some(e); }
|
||||
Ok(v) => on_done(i, v, dur),
|
||||
Err(e) => { if first_err.is_none() { first_err = Some(e); } }
|
||||
}
|
||||
}
|
||||
if !numa_mode && active_workers < node.max_workers {
|
||||
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||||
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
|
||||
debug!(
|
||||
"activated worker {} — efficiency {:.0}%",
|
||||
active_workers + 1,
|
||||
eff * 100.0,
|
||||
);
|
||||
activate_tx.send(()).ok();
|
||||
active_workers += 1;
|
||||
eff_at_last_spawn = eff;
|
||||
cpu_sample = CpuSample::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => {
|
||||
if !numa_mode && active_workers < node.max_workers {
|
||||
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||||
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
|
||||
debug!(
|
||||
"activated worker {} (poll) — efficiency {:.0}%",
|
||||
active_workers + 1,
|
||||
eff * 100.0,
|
||||
);
|
||||
activate_tx.send(()).ok();
|
||||
active_workers += 1;
|
||||
eff_at_last_spawn = eff;
|
||||
cpu_sample = CpuSample::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
}
|
||||
}
|
||||
// Signal any dormant workers that were never activated
|
||||
// to exit (UMA mode where max_workers was never reached).
|
||||
drop(activate_tx);
|
||||
}); // ws: waits for all workers of this node
|
||||
});
|
||||
}
|
||||
}); // s: waits for all node controllers
|
||||
|
||||
let mut g = first_err.lock().unwrap();
|
||||
match g.take() {
|
||||
match first_err {
|
||||
Some(e) => Err(e),
|
||||
None => Ok(()),
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ use std::path::Path;
|
||||
|
||||
use obikpartitionner::{KmerFilter, KmerPartition, MergeMode};
|
||||
use obisys::{Reporter, Stage, progress_bar};
|
||||
use rayon::prelude::*;
|
||||
use tracing::info;
|
||||
|
||||
use crate::error::{OKIError, OKIResult};
|
||||
@@ -83,23 +82,16 @@ impl KmerIndex {
|
||||
let src_partition = &src.partition;
|
||||
let block_bits = meta.config.block_bits;
|
||||
|
||||
let errors: Vec<obiskio::SKError> = (0..n_partitions)
|
||||
.into_par_iter()
|
||||
.filter_map(|i| {
|
||||
let result = dst_partition
|
||||
.rebuild_partition(src_partition, i, filters, mode, n_genomes, block_bits)
|
||||
.err();
|
||||
pb.inc(1);
|
||||
result
|
||||
})
|
||||
.collect();
|
||||
let order: Vec<usize> = (0..n_partitions).collect();
|
||||
let runner = crate::numa::PartitionRunner::new();
|
||||
runner.run(
|
||||
&order,
|
||||
|i| dst_partition.rebuild_partition(src_partition, i, filters, mode, n_genomes, block_bits),
|
||||
|_, _, _| { pb.inc(1); },
|
||||
).map_err(OKIError::Partition)?;
|
||||
|
||||
pb.finish_and_clear();
|
||||
|
||||
if let Some(e) = errors.into_iter().next() {
|
||||
return Err(OKIError::Partition(e));
|
||||
}
|
||||
|
||||
rep.push(t.stop());
|
||||
|
||||
// Write SENTINEL_INDEXED — output is ready to use.
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::path::Path;
|
||||
use obilayeredmap::{IndexMode, layer::Layer};
|
||||
use obilayeredmap::meta::PartitionMeta;
|
||||
use obisys::{Reporter, Stage, progress_bar};
|
||||
use rayon::prelude::*;
|
||||
use tracing::info;
|
||||
|
||||
use crate::error::{OKIError, OKIResult};
|
||||
@@ -45,25 +44,17 @@ impl KmerIndex {
|
||||
let t = Stage::start("reindex");
|
||||
let pb = progress_bar("reindex", n as u64, "partitions");
|
||||
|
||||
let errors: Vec<String> = (0..n)
|
||||
.into_par_iter()
|
||||
.filter_map(|i| {
|
||||
let res = reindex_partition(
|
||||
&self.partition.part_dir(i).join("index"),
|
||||
&target,
|
||||
block_bits,
|
||||
);
|
||||
pb.inc(1);
|
||||
res.err().map(|e| format!("partition {i}: {e}"))
|
||||
})
|
||||
.collect();
|
||||
let order: Vec<usize> = (0..n).collect();
|
||||
let runner = crate::numa::PartitionRunner::new();
|
||||
runner.run(
|
||||
&order,
|
||||
|i| reindex_partition(&self.partition.part_dir(i).join("index"), &target, block_bits)
|
||||
.map_err(|e| OKIError::InvalidInput(format!("partition {i}: {e}"))),
|
||||
|_, _, _| { pb.inc(1); },
|
||||
)?;
|
||||
|
||||
pb.finish_and_clear();
|
||||
|
||||
if let Some(e) = errors.into_iter().next() {
|
||||
return Err(OKIError::InvalidInput(e));
|
||||
}
|
||||
|
||||
self.meta.config.evidence = target;
|
||||
if matches!(self.meta.config.evidence, IndexMode::Exact) {
|
||||
self.meta.config.block_bits = block_bits;
|
||||
|
||||
+15
-33
@@ -4,7 +4,6 @@ use std::path::Path;
|
||||
|
||||
use obikpartitionner::{KmerPartition, OutputCol, PARTITIONS_SUBDIR};
|
||||
use obisys::{Stage, progress_bar};
|
||||
use rayon::prelude::*;
|
||||
use tracing::info;
|
||||
|
||||
use crate::error::{OKIError, OKIResult};
|
||||
@@ -72,25 +71,16 @@ impl KmerIndex {
|
||||
let pb = progress_bar("select", n_partitions as u64, "partitions");
|
||||
let src_partition = &src.partition;
|
||||
|
||||
let errors: Vec<obiskio::SKError> = (0..n_partitions)
|
||||
.into_par_iter()
|
||||
.filter_map(|i| {
|
||||
let result = dst_partition.select_partition(
|
||||
src_partition, i, specs,
|
||||
n_src_genomes, threshold, output_presence,
|
||||
false,
|
||||
);
|
||||
pb.inc(1);
|
||||
result.err()
|
||||
})
|
||||
.collect();
|
||||
let order: Vec<usize> = (0..n_partitions).collect();
|
||||
let runner = crate::numa::PartitionRunner::new();
|
||||
runner.run(
|
||||
&order,
|
||||
|i| dst_partition.select_partition(src_partition, i, specs, n_src_genomes, threshold, output_presence, false),
|
||||
|_, _, _| { pb.inc(1); },
|
||||
).map_err(OKIError::Partition)?;
|
||||
|
||||
pb.finish_and_clear();
|
||||
|
||||
if let Some(e) = errors.into_iter().next() {
|
||||
return Err(OKIError::Partition(e));
|
||||
}
|
||||
|
||||
let _ = t.stop();
|
||||
|
||||
fs::File::create(output.join(SENTINEL_INDEXED))?;
|
||||
@@ -132,25 +122,17 @@ impl KmerIndex {
|
||||
let t = Stage::start("select");
|
||||
let pb = progress_bar("select", n_partitions as u64, "partitions");
|
||||
|
||||
let errors: Vec<obiskio::SKError> = (0..n_partitions)
|
||||
.into_par_iter()
|
||||
.filter_map(|i| {
|
||||
let result = self.partition.select_partition(
|
||||
&src_partition, i, specs,
|
||||
n_src_genomes, threshold, output_presence,
|
||||
true,
|
||||
);
|
||||
pb.inc(1);
|
||||
result.err()
|
||||
})
|
||||
.collect();
|
||||
let partition = &self.partition;
|
||||
let order: Vec<usize> = (0..n_partitions).collect();
|
||||
let runner = crate::numa::PartitionRunner::new();
|
||||
runner.run(
|
||||
&order,
|
||||
|i| partition.select_partition(&src_partition, i, specs, n_src_genomes, threshold, output_presence, true),
|
||||
|_, _, _| { pb.inc(1); },
|
||||
).map_err(OKIError::Partition)?;
|
||||
|
||||
pb.finish_and_clear();
|
||||
|
||||
if let Some(e) = errors.into_iter().next() {
|
||||
return Err(OKIError::Partition(e));
|
||||
}
|
||||
|
||||
let _ = t.stop();
|
||||
|
||||
// Update index.meta with new genome list and with_counts flag.
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use obicompactvec::{PersistentBitVecBuilder, PersistentCompactIntVecBuilder};
|
||||
use obilayeredmap::meta::PartitionMeta;
|
||||
use obilayeredmap::{IndexMode, OLMError};
|
||||
use obiskio::{SKError, SKResult};
|
||||
|
||||
// ── olm_to_sk ────────────────────────────────────────────────────────────────
|
||||
|
||||
pub(crate) fn olm_to_sk(e: OLMError, context: &'static str) -> SKError {
|
||||
match e {
|
||||
OLMError::Io(e) => SKError::Io(e),
|
||||
other => SKError::InvalidData {
|
||||
context,
|
||||
detail: other.to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ── load_meta ────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Load PartitionMeta, or recover it by probing layer directories.
|
||||
/// Indexes built before meta.json was introduced lack the file.
|
||||
pub(crate) fn load_meta(dir: &Path, context: &'static str) -> SKResult<PartitionMeta> {
|
||||
match PartitionMeta::load(dir) {
|
||||
Ok(m) => Ok(m),
|
||||
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
|
||||
{
|
||||
let mut n = 0usize;
|
||||
while dir.join(format!("layer_{n}")).exists() {
|
||||
n += 1;
|
||||
}
|
||||
let m = PartitionMeta {
|
||||
n_layers: n,
|
||||
mode: IndexMode::default(),
|
||||
};
|
||||
m.save(dir).map_err(|e| olm_to_sk(e, context))?;
|
||||
Ok(m)
|
||||
}
|
||||
Err(e) => Err(olm_to_sk(e, context)),
|
||||
}
|
||||
}
|
||||
|
||||
// ── path helpers ──────────────────────────────────────────────────────────────
|
||||
|
||||
pub(crate) fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
|
||||
dir.join(format!("col_{col:06}.pbiv"))
|
||||
}
|
||||
|
||||
pub(crate) fn col_path_int(dir: &Path, col: usize) -> PathBuf {
|
||||
dir.join(format!("col_{col:06}.pciv"))
|
||||
}
|
||||
|
||||
pub(crate) fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
|
||||
fs::write(
|
||||
dir.join("meta.json"),
|
||||
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
|
||||
)
|
||||
}
|
||||
|
||||
// ── ColBuilder ────────────────────────────────────────────────────────────────
|
||||
|
||||
pub(crate) enum ColBuilder {
|
||||
Bit(PersistentBitVecBuilder),
|
||||
Int(PersistentCompactIntVecBuilder),
|
||||
}
|
||||
|
||||
impl ColBuilder {
|
||||
pub(crate) fn set_val(&mut self, slot: usize, value: u32) {
|
||||
match self {
|
||||
ColBuilder::Bit(b) => b.set(slot, value > 0),
|
||||
ColBuilder::Int(b) => b.set(slot, value),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn close(self) -> SKResult<()> {
|
||||
match self {
|
||||
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
|
||||
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use tracing::debug;
|
||||
use obipipeline::{
|
||||
Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool,
|
||||
make_sink, make_source, make_transform,
|
||||
throttle,
|
||||
};
|
||||
|
||||
use obidebruinj::GraphDeBruijn;
|
||||
use obikseq::CanonicalKmer;
|
||||
use obilayeredmap::{IndexMode, Layer};
|
||||
use obiskio::{SKError, SKResult};
|
||||
|
||||
use crate::common::olm_to_sk;
|
||||
|
||||
// ── KmerGraphData ─────────────────────────────────────────────────────────────
|
||||
|
||||
enum KmerGraphData {
|
||||
File(obipipeline::Throttled<PathBuf>),
|
||||
RawBatch(Vec<CanonicalKmer>),
|
||||
FilteredBatch(Vec<CanonicalKmer>),
|
||||
}
|
||||
|
||||
// ── build_graph ───────────────────────────────────────────────────────────────
|
||||
|
||||
/// Phase 1: pipeline that reads files, filters kmers, pushes into a GraphDeBruijn.
|
||||
///
|
||||
/// `flat_fn(path, emit)`: opens path, iterates kmers, calls `emit(batch)` for each batch.
|
||||
/// `filter(kmer) -> bool`: secondary filter applied in the Transform stage.
|
||||
pub(crate) fn build_graph<I, F, G>(
|
||||
file_source: I,
|
||||
flat_fn: F,
|
||||
filter: G,
|
||||
n_workers: usize,
|
||||
max_open: usize,
|
||||
) -> SKResult<GraphDeBruijn>
|
||||
where
|
||||
I: Iterator<Item = PathBuf> + Send + 'static,
|
||||
F: Fn(PathBuf, &mut dyn FnMut(Vec<CanonicalKmer>)) -> SKResult<()> + Send + Sync + 'static,
|
||||
G: Fn(CanonicalKmer) -> bool + Send + Sync + 'static,
|
||||
{
|
||||
let capacity = 2;
|
||||
|
||||
let flat_fn = Arc::new(flat_fn);
|
||||
let filter = Arc::new(filter);
|
||||
let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new()));
|
||||
let g_sink = Arc::clone(&g_shared);
|
||||
let err_cap: Arc<Mutex<Option<SKError>>> = Arc::new(Mutex::new(None));
|
||||
let err_flat = Arc::clone(&err_cap);
|
||||
|
||||
let throttled = throttle(file_source, max_open);
|
||||
|
||||
let pipeline = Pipeline::new(
|
||||
make_source!(KmerGraphData, throttled, File),
|
||||
vec![
|
||||
Stage::Flat(Arc::new(
|
||||
move |data: KmerGraphData,
|
||||
push: &PipelineSender<Result<KmerGraphData, PipelineError>>,
|
||||
delta: &PipelineSender<isize>|
|
||||
{
|
||||
if let KmerGraphData::File(t) = data {
|
||||
let path = t.item;
|
||||
let _guard = t.guard; // released at end of block
|
||||
let mut count: isize = 0;
|
||||
let push_clone = push.clone();
|
||||
let result = flat_fn(path, &mut |batch: Vec<CanonicalKmer>| {
|
||||
push_clone.send(Ok(KmerGraphData::RawBatch(batch))).ok();
|
||||
count += 1;
|
||||
});
|
||||
match result {
|
||||
Ok(()) => {
|
||||
delta.send(count - 1).ok();
|
||||
}
|
||||
Err(e) => {
|
||||
*err_flat.lock().unwrap() = Some(e);
|
||||
delta.send(-1).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
) as SharedFlatFn<KmerGraphData>),
|
||||
make_transform!(KmerGraphData, {
|
||||
let filter = Arc::clone(&filter);
|
||||
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
|
||||
batch.into_iter().filter(|k| filter(*k)).collect()
|
||||
}
|
||||
}, RawBatch, FilteredBatch),
|
||||
],
|
||||
make_sink!(KmerGraphData, {
|
||||
move |batch: Vec<CanonicalKmer>| {
|
||||
let mut g = g_sink.lock().unwrap();
|
||||
for kmer in batch {
|
||||
g.push(kmer);
|
||||
}
|
||||
}
|
||||
}, FilteredBatch),
|
||||
);
|
||||
|
||||
WorkerPool::new(pipeline, n_workers, capacity).run();
|
||||
|
||||
if let Some(e) = Arc::try_unwrap(err_cap)
|
||||
.unwrap_or_else(|_| panic!("build_graph: err_cap not uniquely owned after pipeline"))
|
||||
.into_inner()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
{
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
let g = Arc::try_unwrap(g_shared)
|
||||
.unwrap_or_else(|_| panic!("build_graph: g_shared not uniquely owned after pipeline"))
|
||||
.into_inner()
|
||||
.unwrap_or_else(|e| e.into_inner());
|
||||
|
||||
Ok(g)
|
||||
}
|
||||
|
||||
// ── write_graph_as_unitigs ────────────────────────────────────────────────────
|
||||
|
||||
/// Phase 2 (write unitigs only): compute degrees, write unitigs to `layer_dir`, drop graph.
|
||||
///
|
||||
/// Returns n_kmers. Does NOT build the MPHF — caller does it.
|
||||
pub(crate) fn write_graph_as_unitigs(g: GraphDeBruijn, layer_dir: &Path) -> SKResult<usize> {
|
||||
let n_kmers = g.len();
|
||||
g.compute_degrees_and_mark_starts();
|
||||
std::fs::create_dir_all(layer_dir)?;
|
||||
let mut uw = Layer::<()>::unitig_writer(layer_dir).map_err(|e| olm_to_sk(e, "graph pipeline"))?;
|
||||
g.try_for_each_unitig(|unitig| uw.write(unitig))?;
|
||||
uw.close()?;
|
||||
drop(g);
|
||||
Ok(n_kmers)
|
||||
}
|
||||
|
||||
// ── materialize_layer ─────────────────────────────────────────────────────────
|
||||
|
||||
/// Phase 2 (full): write_graph_as_unitigs + `Layer::<()>::build`.
|
||||
///
|
||||
/// Returns n_kmers.
|
||||
pub(crate) fn materialize_layer(
|
||||
g: GraphDeBruijn,
|
||||
layer_dir: &Path,
|
||||
block_bits: u8,
|
||||
evidence: &IndexMode,
|
||||
) -> SKResult<usize> {
|
||||
let n = write_graph_as_unitigs(g, layer_dir)?;
|
||||
debug!("materialize_layer: unitigs written ({n} kmers), building MPHF");
|
||||
Layer::<()>::build(layer_dir, block_bits, evidence)
|
||||
.map_err(|e| olm_to_sk(e, "graph pipeline"))?;
|
||||
debug!("materialize_layer: MPHF build done");
|
||||
Ok(n)
|
||||
}
|
||||
@@ -6,24 +6,16 @@ use epserde::prelude::*;
|
||||
use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec};
|
||||
use obidebruinj::GraphDeBruijn;
|
||||
use obilayeredmap::meta::PartitionMeta;
|
||||
use obilayeredmap::{IndexMode, OLMError, layer::Layer};
|
||||
use obilayeredmap::{IndexMode, layer::Layer};
|
||||
use obiskio::{SKError, SKFileMeta, SKFileReader};
|
||||
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
|
||||
|
||||
use crate::common::olm_to_sk;
|
||||
use crate::graph_pipeline::{materialize_layer, write_graph_as_unitigs};
|
||||
use crate::partition::KmerPartition;
|
||||
|
||||
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
|
||||
|
||||
fn olm_to_sk(e: OLMError) -> SKError {
|
||||
match e {
|
||||
OLMError::Io(io_err) => SKError::Io(io_err),
|
||||
other => SKError::InvalidData {
|
||||
context: "layer build",
|
||||
detail: other.to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_if_exists(path: &std::path::Path) {
|
||||
if let Err(e) = fs::remove_file(path) {
|
||||
if e.kind() != io::ErrorKind::NotFound {
|
||||
@@ -101,18 +93,8 @@ impl KmerPartition {
|
||||
}
|
||||
}
|
||||
|
||||
let n_kmers = g.len();
|
||||
g.compute_degrees_and_mark_starts();
|
||||
|
||||
fs::create_dir_all(&layer_dir)?;
|
||||
|
||||
let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?;
|
||||
g.try_for_each_unitig(|unitig| {
|
||||
uw.write(unitig)
|
||||
})?;
|
||||
uw.close()?;
|
||||
|
||||
if with_counts {
|
||||
let n_kmers = if with_counts {
|
||||
let n = write_graph_as_unitigs(g, &layer_dir)?;
|
||||
Layer::<PersistentCompactIntMatrix>::build(
|
||||
&layer_dir,
|
||||
block_bits,
|
||||
@@ -122,10 +104,11 @@ impl KmerPartition {
|
||||
_ => 1,
|
||||
},
|
||||
)
|
||||
.map_err(olm_to_sk)?;
|
||||
.map_err(|e| olm_to_sk(e, "layer build"))?;
|
||||
n
|
||||
} else {
|
||||
Layer::<()>::build(&layer_dir, block_bits, mode).map_err(olm_to_sk)?;
|
||||
}
|
||||
materialize_layer(g, &layer_dir, block_bits, mode)?
|
||||
};
|
||||
|
||||
let index_dir = layer_dir.parent().expect("layer_dir has a parent");
|
||||
PartitionMeta {
|
||||
@@ -133,7 +116,7 @@ impl KmerPartition {
|
||||
mode: mode.clone(),
|
||||
}
|
||||
.save(index_dir)
|
||||
.map_err(olm_to_sk)?;
|
||||
.map_err(|e| olm_to_sk(e, "layer build"))?;
|
||||
|
||||
Ok(n_kmers)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
pub mod filter;
|
||||
mod common;
|
||||
mod distance;
|
||||
mod dump_layer;
|
||||
mod graph_pipeline;
|
||||
mod index_layer;
|
||||
mod kmer_sort;
|
||||
mod merge_layer;
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
@@ -14,12 +13,13 @@ use obicompactvec::{
|
||||
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
|
||||
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder,
|
||||
};
|
||||
use obidebruinj::GraphDeBruijn;
|
||||
use obikseq::CanonicalKmer;
|
||||
use obilayeredmap::meta::PartitionMeta;
|
||||
use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError};
|
||||
use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly};
|
||||
use obiskio::{SKError, SKResult, UnitigFileReader};
|
||||
|
||||
use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta};
|
||||
use crate::graph_pipeline::{build_graph, materialize_layer};
|
||||
use crate::partition::KmerPartition;
|
||||
|
||||
// ── MergeMode ─────────────────────────────────────────────────────────────────
|
||||
@@ -30,29 +30,6 @@ pub enum MergeMode {
|
||||
Count,
|
||||
}
|
||||
|
||||
// ── ColBuilder — enum dispatch to avoid trait-object boxing issues ─────────────
|
||||
|
||||
enum ColBuilder {
|
||||
Bit(PersistentBitVecBuilder),
|
||||
Int(PersistentCompactIntVecBuilder),
|
||||
}
|
||||
|
||||
impl ColBuilder {
|
||||
fn set_val(&mut self, slot: usize, value: u32) {
|
||||
match self {
|
||||
ColBuilder::Bit(b) => b.set(slot, value > 0),
|
||||
ColBuilder::Int(b) => b.set(slot, value),
|
||||
}
|
||||
}
|
||||
|
||||
fn close(self) -> SKResult<()> {
|
||||
match self {
|
||||
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
|
||||
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── SrcLayerData — opened source matrix for pass-2 lookup ─────────────────────
|
||||
|
||||
pub(crate) enum SrcLayerData {
|
||||
@@ -66,18 +43,18 @@ impl SrcLayerData {
|
||||
match merge_mode {
|
||||
MergeMode::Presence => {
|
||||
if counts_dir.exists() && !layer_dir.join("presence").exists() {
|
||||
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?;
|
||||
let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?;
|
||||
let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
|
||||
Ok(SrcLayerData::Count(mphf, mat))
|
||||
} else {
|
||||
// presence dir exists, or neither exists → Implicit handled by open()
|
||||
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?;
|
||||
let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?;
|
||||
let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?;
|
||||
Ok(SrcLayerData::Presence(mphf, mat))
|
||||
}
|
||||
}
|
||||
MergeMode::Count => {
|
||||
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?;
|
||||
let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?;
|
||||
if counts_dir.exists() {
|
||||
let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
|
||||
Ok(SrcLayerData::Count(mphf, mat))
|
||||
@@ -107,53 +84,6 @@ impl SrcLayerData {
|
||||
|
||||
const INDEX_SUBDIR: &str = "index";
|
||||
|
||||
/// Load PartitionMeta, or recover it by probing layer directories.
|
||||
/// Indexes built before meta.json was introduced lack the file.
|
||||
fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
|
||||
match PartitionMeta::load(dir) {
|
||||
Ok(m) => Ok(m),
|
||||
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
|
||||
{
|
||||
let mut n = 0usize;
|
||||
while dir.join(format!("layer_{n}")).exists() {
|
||||
n += 1;
|
||||
}
|
||||
let m = PartitionMeta {
|
||||
n_layers: n,
|
||||
mode: IndexMode::default(),
|
||||
};
|
||||
m.save(dir).map_err(olm_to_sk)?;
|
||||
Ok(m)
|
||||
}
|
||||
Err(e) => Err(olm_to_sk(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn olm_to_sk(e: OLMError) -> SKError {
|
||||
match e {
|
||||
OLMError::Io(e) => SKError::Io(e),
|
||||
other => SKError::InvalidData {
|
||||
context: "merge",
|
||||
detail: other.to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
|
||||
dir.join(format!("col_{col:06}.pbiv"))
|
||||
}
|
||||
|
||||
fn col_path_int(dir: &Path, col: usize) -> PathBuf {
|
||||
dir.join(format!("col_{col:06}.pciv"))
|
||||
}
|
||||
|
||||
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
|
||||
fs::write(
|
||||
dir.join("meta.json"),
|
||||
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
|
||||
)
|
||||
}
|
||||
|
||||
// ── KmerPartition::merge_partition ────────────────────────────────────────────
|
||||
|
||||
impl KmerPartition {
|
||||
@@ -180,8 +110,8 @@ impl KmerPartition {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open
|
||||
let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?);
|
||||
load_meta(&dst_index_dir, "merge")?; // ensure meta.json exists before LayeredMap::open
|
||||
let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?);
|
||||
let n_dst_layers = dst_map.n_layers();
|
||||
let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum();
|
||||
|
||||
@@ -191,7 +121,7 @@ impl KmerPartition {
|
||||
for l in 0..n_dst_layers {
|
||||
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
|
||||
Layer::<()>::init_presence_matrix(&layer_dir, dst_map.layer(l).n())
|
||||
.map_err(olm_to_sk)?;
|
||||
.map_err(|e| olm_to_sk(e, "merge"))?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,7 +139,7 @@ impl KmerPartition {
|
||||
if !src_index_dir.exists() {
|
||||
continue;
|
||||
}
|
||||
let src_meta = load_meta(&src_index_dir)?;
|
||||
let src_meta = load_meta(&src_index_dir, "merge")?;
|
||||
for l in 0..src_meta.n_layers {
|
||||
let p = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin");
|
||||
if p.exists() {
|
||||
@@ -221,92 +151,36 @@ impl KmerPartition {
|
||||
let n_src_layers = unitig_paths.len();
|
||||
debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)");
|
||||
|
||||
enum Pass1Data {
|
||||
File((PathBuf, ThrottleGuard)),
|
||||
Batch(Vec<CanonicalKmer>),
|
||||
NewKmers(Vec<CanonicalKmer>),
|
||||
}
|
||||
|
||||
const BATCH: usize = 4096;
|
||||
let n_workers = rayon::current_num_threads().min(16).max(4);
|
||||
let capacity = 2;
|
||||
// At most 2 files open simultaneously: keeps n_workers-2 workers free
|
||||
// for the Transform stage. Each open file monopolises one worker for the
|
||||
// full duration of its read, so this must stay well below n_workers.
|
||||
let max_open = 2;
|
||||
|
||||
let dst_filter = Arc::clone(&dst_map);
|
||||
let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new()));
|
||||
let g_sink = Arc::clone(&g_shared);
|
||||
let pass1_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
|
||||
let err_cap = Arc::clone(&pass1_err);
|
||||
|
||||
let throttled_paths = throttle(unitig_paths.into_iter(), max_open);
|
||||
|
||||
let pipeline = Pipeline::new(
|
||||
make_source!(Pass1Data, throttled_paths.map(|t| (t.item, t.guard)), File),
|
||||
vec![
|
||||
Stage::Flat(Arc::new(
|
||||
move |data: Pass1Data,
|
||||
push: &PipelineSender<Result<Pass1Data, PipelineError>>,
|
||||
delta: &PipelineSender<isize>|
|
||||
{
|
||||
if let Pass1Data::File((path, _guard)) = data {
|
||||
// _guard is dropped at end of this block, releasing the slot.
|
||||
let reader = match UnitigFileReader::open_sequential(&path) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
*err_cap.lock().unwrap() = Some(e.to_string());
|
||||
delta.send(-1).ok();
|
||||
return;
|
||||
}
|
||||
};
|
||||
let g = build_graph(
|
||||
unitig_paths.into_iter(),
|
||||
move |path: PathBuf, emit: &mut dyn FnMut(Vec<CanonicalKmer>)| -> SKResult<()> {
|
||||
let reader = UnitigFileReader::open_sequential(&path)?;
|
||||
let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
|
||||
let mut count: isize = 0;
|
||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||
batch.push(kmer);
|
||||
if batch.len() == BATCH {
|
||||
let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH));
|
||||
push.send(Ok(Pass1Data::Batch(b))).ok();
|
||||
count += 1;
|
||||
emit(std::mem::replace(&mut batch, Vec::with_capacity(BATCH)));
|
||||
}
|
||||
}
|
||||
if !batch.is_empty() {
|
||||
push.send(Ok(Pass1Data::Batch(batch))).ok();
|
||||
count += 1;
|
||||
emit(batch);
|
||||
}
|
||||
delta.send(count - 1).ok();
|
||||
}
|
||||
}
|
||||
) as SharedFlatFn<Pass1Data>),
|
||||
make_transform!(Pass1Data, {
|
||||
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
|
||||
batch.into_iter()
|
||||
.filter(|&k| dst_filter.query(k).is_none())
|
||||
.collect()
|
||||
}
|
||||
}, Batch, NewKmers),
|
||||
],
|
||||
make_sink!(Pass1Data, {
|
||||
move |batch: Vec<CanonicalKmer>| {
|
||||
let mut g = g_sink.lock().unwrap();
|
||||
for kmer in batch {
|
||||
g.push(kmer);
|
||||
}
|
||||
}
|
||||
}, NewKmers),
|
||||
);
|
||||
Ok(())
|
||||
},
|
||||
move |kmer| dst_filter.query(kmer).is_none(),
|
||||
n_workers,
|
||||
max_open,
|
||||
)?;
|
||||
|
||||
WorkerPool::new(pipeline, n_workers, capacity).run();
|
||||
|
||||
if let Some(msg) = Arc::try_unwrap(pass1_err).unwrap().into_inner().unwrap() {
|
||||
return Err(SKError::InvalidData { context: "merge pass1", detail: msg });
|
||||
}
|
||||
|
||||
let g = Arc::try_unwrap(g_shared)
|
||||
.unwrap_or_else(|_| panic!("pass1: g_shared not uniquely owned after pipeline"))
|
||||
.into_inner()
|
||||
.unwrap_or_else(|e| e.into_inner());
|
||||
let any_new = g.len() > 0;
|
||||
debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len());
|
||||
|
||||
@@ -315,25 +189,8 @@ impl KmerPartition {
|
||||
let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}"));
|
||||
|
||||
let n_new = if any_new {
|
||||
let t_deg = std::time::Instant::now();
|
||||
g.compute_degrees_and_mark_starts();
|
||||
debug!("partition {i}: compute_degrees in {:.3}s — {} nodes",
|
||||
t_deg.elapsed().as_secs_f64(), g.len());
|
||||
fs::create_dir_all(&new_layer_dir)?;
|
||||
let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?;
|
||||
debug!("partition {i}: unitig traversal start — {} nodes", g.len());
|
||||
let mut n_unitigs = 0usize;
|
||||
g.try_for_each_unitig(|unitig| {
|
||||
n_unitigs += 1;
|
||||
uw.write(unitig)
|
||||
})?;
|
||||
debug!("partition {i}: unitig writer closing");
|
||||
uw.close()?;
|
||||
let n_nodes = g.len();
|
||||
debug!("partition {i}: unitig writer closed — dropping graph ({n_nodes} nodes)");
|
||||
drop(g);
|
||||
debug!("partition {i}: graph dropped — starting MPHF build ({n_unitigs} unitigs)");
|
||||
Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
|
||||
let n_nodes = materialize_layer(g, &new_layer_dir, block_bits, evidence)?;
|
||||
debug!("partition {i}: MPHF build done");
|
||||
n_nodes
|
||||
} else {
|
||||
@@ -343,7 +200,7 @@ impl KmerPartition {
|
||||
|
||||
let t_open = std::time::Instant::now();
|
||||
let new_mphf: Option<Arc<MphfOnly>> = if any_new {
|
||||
Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?))
|
||||
Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(|e| olm_to_sk(e, "merge"))?))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -449,7 +306,7 @@ impl KmerPartition {
|
||||
col_offset += src_n;
|
||||
continue;
|
||||
}
|
||||
let src_meta = load_meta(&src_index_dir)?;
|
||||
let src_meta = load_meta(&src_index_dir, "merge")?;
|
||||
for l in 0..src_meta.n_layers {
|
||||
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
|
||||
if src_layer_dir.join("unitigs.bin").exists() {
|
||||
@@ -483,6 +340,7 @@ impl KmerPartition {
|
||||
let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
|
||||
let err_cap2 = Arc::clone(&pass2_err);
|
||||
|
||||
let capacity = 2;
|
||||
let throttled_pass2 = throttle(pass2_items.into_iter(), max_open);
|
||||
|
||||
let pipeline2 = Pipeline::new(
|
||||
@@ -516,6 +374,7 @@ impl KmerPartition {
|
||||
return;
|
||||
}
|
||||
};
|
||||
const BATCH: usize = 4096;
|
||||
let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
|
||||
let mut count: isize = 0;
|
||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||
@@ -617,9 +476,9 @@ impl KmerPartition {
|
||||
write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total)
|
||||
.map_err(SKError::Io)?;
|
||||
|
||||
let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?;
|
||||
let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?;
|
||||
part_meta.n_layers = new_layer_idx + 1;
|
||||
part_meta.save(&dst_index_dir).map_err(olm_to_sk)?;
|
||||
part_meta.save(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?;
|
||||
}
|
||||
|
||||
debug!("partition {i}: builders closed in {:.3}s", t_close.elapsed().as_secs_f64());
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::Path;
|
||||
|
||||
use obicompactvec::{
|
||||
PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder,
|
||||
@@ -9,85 +7,17 @@ use obicompactvec::{
|
||||
use obidebruinj::GraphDeBruijn;
|
||||
use obikseq::CanonicalKmer;
|
||||
use obilayeredmap::meta::PartitionMeta;
|
||||
use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError};
|
||||
use obilayeredmap::{IndexMode, MphfLayer};
|
||||
use obiskio::{SKError, SKResult, UnitigFileReader};
|
||||
|
||||
use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta};
|
||||
use crate::filter::{KmerFilter, passes_all};
|
||||
use crate::graph_pipeline::materialize_layer;
|
||||
use crate::merge_layer::{MergeMode, SrcLayerData};
|
||||
use crate::partition::KmerPartition;
|
||||
|
||||
const INDEX_SUBDIR: &str = "index";
|
||||
|
||||
fn olm_to_sk(e: OLMError) -> SKError {
|
||||
match e {
|
||||
OLMError::Io(e) => SKError::Io(e),
|
||||
other => SKError::InvalidData {
|
||||
context: "rebuild",
|
||||
detail: other.to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
|
||||
dir.join(format!("col_{col:06}.pbiv"))
|
||||
}
|
||||
|
||||
fn col_path_int(dir: &Path, col: usize) -> PathBuf {
|
||||
dir.join(format!("col_{col:06}.pciv"))
|
||||
}
|
||||
|
||||
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
|
||||
fs::write(
|
||||
dir.join("meta.json"),
|
||||
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
|
||||
)
|
||||
}
|
||||
|
||||
// ── ColBuilder ────────────────────────────────────────────────────────────────
|
||||
|
||||
enum ColBuilder {
|
||||
Bit(PersistentBitVecBuilder),
|
||||
Int(PersistentCompactIntVecBuilder),
|
||||
}
|
||||
|
||||
impl ColBuilder {
|
||||
fn set_val(&mut self, slot: usize, value: u32) {
|
||||
match self {
|
||||
ColBuilder::Bit(b) => b.set(slot, value > 0),
|
||||
ColBuilder::Int(b) => b.set(slot, value),
|
||||
}
|
||||
}
|
||||
|
||||
fn close(self) -> SKResult<()> {
|
||||
match self {
|
||||
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
|
||||
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
|
||||
match PartitionMeta::load(dir) {
|
||||
Ok(m) => Ok(m),
|
||||
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
|
||||
{
|
||||
let mut n = 0usize;
|
||||
while dir.join(format!("layer_{n}")).exists() {
|
||||
n += 1;
|
||||
}
|
||||
let m = PartitionMeta {
|
||||
n_layers: n,
|
||||
mode: IndexMode::default(),
|
||||
};
|
||||
m.save(dir).map_err(olm_to_sk)?;
|
||||
Ok(m)
|
||||
}
|
||||
Err(e) => Err(olm_to_sk(e)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`.
|
||||
///
|
||||
/// Uses [`SrcLayerData`] semantics: counts take priority over presence when
|
||||
@@ -99,7 +29,7 @@ fn iter_src_layers(
|
||||
filters: &[Box<dyn KmerFilter>],
|
||||
mut cb: impl FnMut(CanonicalKmer, Box<[u32]>),
|
||||
) -> SKResult<()> {
|
||||
let src_meta = load_meta(src_index_dir)?;
|
||||
let src_meta = load_meta(src_index_dir, "rebuild")?;
|
||||
for l in 0..src_meta.n_layers {
|
||||
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
|
||||
let unitigs_path = src_layer_dir.join("unitigs.bin");
|
||||
@@ -144,7 +74,7 @@ impl KmerPartition {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let src_meta = load_meta(&src_index_dir)?;
|
||||
let src_meta = load_meta(&src_index_dir, "rebuild")?;
|
||||
if src_meta.n_layers == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -159,30 +89,20 @@ impl KmerPartition {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let n_new = g.len();
|
||||
g.compute_degrees_and_mark_starts();
|
||||
|
||||
// ── Build MPHF in dst layer_0 ─────────────────────────────────────────
|
||||
let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR);
|
||||
let dst_layer_dir = dst_index_dir.join("layer_0");
|
||||
fs::create_dir_all(&dst_layer_dir)?;
|
||||
|
||||
let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?;
|
||||
g.try_for_each_unitig(|unitig| {
|
||||
uw.write(unitig)
|
||||
})?;
|
||||
uw.close()?;
|
||||
drop(g);
|
||||
|
||||
Layer::<()>::build(&dst_layer_dir, block_bits, &IndexMode::Exact).map_err(olm_to_sk)?;
|
||||
let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact).map_err(olm_to_sk)?;
|
||||
let n_new = materialize_layer(g, &dst_layer_dir, block_bits, &IndexMode::Exact)?;
|
||||
let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact)
|
||||
.map_err(|e| olm_to_sk(e, "rebuild"))?;
|
||||
|
||||
// ── Prepare matrix builders (one column per genome) ───────────────────
|
||||
let data_dir = match mode {
|
||||
MergeMode::Presence => dst_layer_dir.join("presence"),
|
||||
MergeMode::Count => dst_layer_dir.join("counts"),
|
||||
};
|
||||
fs::create_dir_all(&data_dir)?;
|
||||
std::fs::create_dir_all(&data_dir)?;
|
||||
|
||||
let mut builders: Vec<ColBuilder> = match mode {
|
||||
MergeMode::Presence => {
|
||||
@@ -234,7 +154,7 @@ impl KmerPartition {
|
||||
mode: IndexMode::Exact,
|
||||
}
|
||||
.save(&dst_index_dir)
|
||||
.map_err(olm_to_sk)?;
|
||||
.map_err(|e| olm_to_sk(e, "rebuild"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user