refactor: restructure partitioner with shared utilities and pipeline

This commit restructures the partitioner crate by extracting shared utilities and the `ColBuilder` enum into a new `common` module. It introduces a multi-phase `graph_pipeline` for constructing and materializing De Bruijn graphs, replacing manual graph construction in `index_layer`, `merge_layer`, and `rebuild_layer`. All layer workflows now use centralized `build_graph` and `materialize_layer` abstractions, with standardized error context strings for improved diagnostics.
This commit is contained in:
Eric Coissac
2026-06-15 14:01:43 +02:00
parent a9567ad023
commit bc92dc4592
6 changed files with 295 additions and 295 deletions
+84
View File
@@ -0,0 +1,84 @@
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use obicompactvec::{PersistentBitVecBuilder, PersistentCompactIntVecBuilder};
use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, OLMError};
use obiskio::{SKError, SKResult};
// ── olm_to_sk ────────────────────────────────────────────────────────────────
pub(crate) fn olm_to_sk(e: OLMError, context: &'static str) -> SKError {
match e {
OLMError::Io(e) => SKError::Io(e),
other => SKError::InvalidData {
context,
detail: other.to_string(),
},
}
}
// ── load_meta ────────────────────────────────────────────────────────────────
/// Load PartitionMeta, or recover it by probing layer directories.
/// Indexes built before meta.json was introduced lack the file.
pub(crate) fn load_meta(dir: &Path, context: &'static str) -> SKResult<PartitionMeta> {
match PartitionMeta::load(dir) {
Ok(m) => Ok(m),
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
{
let mut n = 0usize;
while dir.join(format!("layer_{n}")).exists() {
n += 1;
}
let m = PartitionMeta {
n_layers: n,
mode: IndexMode::default(),
};
m.save(dir).map_err(|e| olm_to_sk(e, context))?;
Ok(m)
}
Err(e) => Err(olm_to_sk(e, context)),
}
}
// ── path helpers ──────────────────────────────────────────────────────────────
pub(crate) fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pbiv"))
}
pub(crate) fn col_path_int(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pciv"))
}
pub(crate) fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
fs::write(
dir.join("meta.json"),
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
)
}
// ── ColBuilder ────────────────────────────────────────────────────────────────
pub(crate) enum ColBuilder {
Bit(PersistentBitVecBuilder),
Int(PersistentCompactIntVecBuilder),
}
impl ColBuilder {
pub(crate) fn set_val(&mut self, slot: usize, value: u32) {
match self {
ColBuilder::Bit(b) => b.set(slot, value > 0),
ColBuilder::Int(b) => b.set(slot, value),
}
}
pub(crate) fn close(self) -> SKResult<()> {
match self {
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
}
}
}
+152
View File
@@ -0,0 +1,152 @@
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tracing::debug;
use obipipeline::{
Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool,
make_sink, make_source, make_transform,
throttle,
};
use obidebruinj::GraphDeBruijn;
use obikseq::CanonicalKmer;
use obilayeredmap::{IndexMode, Layer};
use obiskio::{SKError, SKResult};
use crate::common::olm_to_sk;
// ── KmerGraphData ─────────────────────────────────────────────────────────────
enum KmerGraphData {
File(obipipeline::Throttled<PathBuf>),
RawBatch(Vec<CanonicalKmer>),
FilteredBatch(Vec<CanonicalKmer>),
}
// ── build_graph ───────────────────────────────────────────────────────────────
/// Phase 1: pipeline that reads files, filters kmers, pushes into a GraphDeBruijn.
///
/// `flat_fn(path, emit)`: opens path, iterates kmers, calls `emit(batch)` for each batch.
/// `filter(kmer) -> bool`: secondary filter applied in the Transform stage.
pub(crate) fn build_graph<I, F, G>(
file_source: I,
flat_fn: F,
filter: G,
n_workers: usize,
max_open: usize,
) -> SKResult<GraphDeBruijn>
where
I: Iterator<Item = PathBuf> + Send + 'static,
F: Fn(PathBuf, &mut dyn FnMut(Vec<CanonicalKmer>)) -> SKResult<()> + Send + Sync + 'static,
G: Fn(CanonicalKmer) -> bool + Send + Sync + 'static,
{
let capacity = 2;
let flat_fn = Arc::new(flat_fn);
let filter = Arc::new(filter);
let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new()));
let g_sink = Arc::clone(&g_shared);
let err_cap: Arc<Mutex<Option<SKError>>> = Arc::new(Mutex::new(None));
let err_flat = Arc::clone(&err_cap);
let throttled = throttle(file_source, max_open);
let pipeline = Pipeline::new(
make_source!(KmerGraphData, throttled, File),
vec![
Stage::Flat(Arc::new(
move |data: KmerGraphData,
push: &PipelineSender<Result<KmerGraphData, PipelineError>>,
delta: &PipelineSender<isize>|
{
if let KmerGraphData::File(t) = data {
let path = t.item;
let _guard = t.guard; // released at end of block
let mut count: isize = 0;
let push_clone = push.clone();
let result = flat_fn(path, &mut |batch: Vec<CanonicalKmer>| {
push_clone.send(Ok(KmerGraphData::RawBatch(batch))).ok();
count += 1;
});
match result {
Ok(()) => {
delta.send(count - 1).ok();
}
Err(e) => {
*err_flat.lock().unwrap() = Some(e);
delta.send(-1).ok();
}
}
}
},
) as SharedFlatFn<KmerGraphData>),
make_transform!(KmerGraphData, {
let filter = Arc::clone(&filter);
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
batch.into_iter().filter(|k| filter(*k)).collect()
}
}, RawBatch, FilteredBatch),
],
make_sink!(KmerGraphData, {
move |batch: Vec<CanonicalKmer>| {
let mut g = g_sink.lock().unwrap();
for kmer in batch {
g.push(kmer);
}
}
}, FilteredBatch),
);
WorkerPool::new(pipeline, n_workers, capacity).run();
if let Some(e) = Arc::try_unwrap(err_cap)
.unwrap_or_else(|_| panic!("build_graph: err_cap not uniquely owned after pipeline"))
.into_inner()
.unwrap_or_else(|e| e.into_inner())
{
return Err(e);
}
let g = Arc::try_unwrap(g_shared)
.unwrap_or_else(|_| panic!("build_graph: g_shared not uniquely owned after pipeline"))
.into_inner()
.unwrap_or_else(|e| e.into_inner());
Ok(g)
}
// ── write_graph_as_unitigs ────────────────────────────────────────────────────
/// Phase 2 (write unitigs only): compute degrees, write unitigs to `layer_dir`, drop graph.
///
/// Returns n_kmers. Does NOT build the MPHF — caller does it.
pub(crate) fn write_graph_as_unitigs(g: GraphDeBruijn, layer_dir: &Path) -> SKResult<usize> {
let n_kmers = g.len();
g.compute_degrees_and_mark_starts();
std::fs::create_dir_all(layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(layer_dir).map_err(|e| olm_to_sk(e, "graph pipeline"))?;
g.try_for_each_unitig(|unitig| uw.write(unitig))?;
uw.close()?;
drop(g);
Ok(n_kmers)
}
// ── materialize_layer ─────────────────────────────────────────────────────────
/// Phase 2 (full): write_graph_as_unitigs + `Layer::<()>::build`.
///
/// Returns n_kmers.
pub(crate) fn materialize_layer(
g: GraphDeBruijn,
layer_dir: &Path,
block_bits: u8,
evidence: &IndexMode,
) -> SKResult<usize> {
let n = write_graph_as_unitigs(g, layer_dir)?;
debug!("materialize_layer: unitigs written ({n} kmers), building MPHF");
Layer::<()>::build(layer_dir, block_bits, evidence)
.map_err(|e| olm_to_sk(e, "graph pipeline"))?;
debug!("materialize_layer: MPHF build done");
Ok(n)
}
+10 -27
View File
@@ -6,24 +6,16 @@ use epserde::prelude::*;
use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec};
use obidebruinj::GraphDeBruijn; use obidebruinj::GraphDeBruijn;
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, OLMError, layer::Layer}; use obilayeredmap::{IndexMode, layer::Layer};
use obiskio::{SKError, SKFileMeta, SKFileReader}; use obiskio::{SKError, SKFileMeta, SKFileReader};
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
use crate::common::olm_to_sk;
use crate::graph_pipeline::{materialize_layer, write_graph_as_unitigs};
use crate::partition::KmerPartition; use crate::partition::KmerPartition;
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>; type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
fn olm_to_sk(e: OLMError) -> SKError {
match e {
OLMError::Io(io_err) => SKError::Io(io_err),
other => SKError::InvalidData {
context: "layer build",
detail: other.to_string(),
},
}
}
fn remove_if_exists(path: &std::path::Path) { fn remove_if_exists(path: &std::path::Path) {
if let Err(e) = fs::remove_file(path) { if let Err(e) = fs::remove_file(path) {
if e.kind() != io::ErrorKind::NotFound { if e.kind() != io::ErrorKind::NotFound {
@@ -101,18 +93,8 @@ impl KmerPartition {
} }
} }
let n_kmers = g.len(); let n_kmers = if with_counts {
g.compute_degrees_and_mark_starts(); let n = write_graph_as_unitigs(g, &layer_dir)?;
fs::create_dir_all(&layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?;
g.try_for_each_unitig(|unitig| {
uw.write(unitig)
})?;
uw.close()?;
if with_counts {
Layer::<PersistentCompactIntMatrix>::build( Layer::<PersistentCompactIntMatrix>::build(
&layer_dir, &layer_dir,
block_bits, block_bits,
@@ -122,10 +104,11 @@ impl KmerPartition {
_ => 1, _ => 1,
}, },
) )
.map_err(olm_to_sk)?; .map_err(|e| olm_to_sk(e, "layer build"))?;
n
} else { } else {
Layer::<()>::build(&layer_dir, block_bits, mode).map_err(olm_to_sk)?; materialize_layer(g, &layer_dir, block_bits, mode)?
} };
let index_dir = layer_dir.parent().expect("layer_dir has a parent"); let index_dir = layer_dir.parent().expect("layer_dir has a parent");
PartitionMeta { PartitionMeta {
@@ -133,7 +116,7 @@ impl KmerPartition {
mode: mode.clone(), mode: mode.clone(),
} }
.save(index_dir) .save(index_dir)
.map_err(olm_to_sk)?; .map_err(|e| olm_to_sk(e, "layer build"))?;
Ok(n_kmers) Ok(n_kmers)
} }
+2
View File
@@ -1,6 +1,8 @@
pub mod filter; pub mod filter;
mod common;
mod distance; mod distance;
mod dump_layer; mod dump_layer;
mod graph_pipeline;
mod index_layer; mod index_layer;
mod kmer_sort; mod kmer_sort;
mod merge_layer; mod merge_layer;
+36 -177
View File
@@ -1,5 +1,4 @@
use std::fs; use std::fs;
use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -14,12 +13,13 @@ use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder, PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder,
}; };
use obidebruinj::GraphDeBruijn;
use obikseq::CanonicalKmer; use obikseq::CanonicalKmer;
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError}; use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly};
use obiskio::{SKError, SKResult, UnitigFileReader}; use obiskio::{SKError, SKResult, UnitigFileReader};
use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta};
use crate::graph_pipeline::{build_graph, materialize_layer};
use crate::partition::KmerPartition; use crate::partition::KmerPartition;
// ── MergeMode ───────────────────────────────────────────────────────────────── // ── MergeMode ─────────────────────────────────────────────────────────────────
@@ -30,29 +30,6 @@ pub enum MergeMode {
Count, Count,
} }
// ── ColBuilder — enum dispatch to avoid trait-object boxing issues ─────────────
enum ColBuilder {
Bit(PersistentBitVecBuilder),
Int(PersistentCompactIntVecBuilder),
}
impl ColBuilder {
fn set_val(&mut self, slot: usize, value: u32) {
match self {
ColBuilder::Bit(b) => b.set(slot, value > 0),
ColBuilder::Int(b) => b.set(slot, value),
}
}
fn close(self) -> SKResult<()> {
match self {
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
}
}
}
// ── SrcLayerData — opened source matrix for pass-2 lookup ───────────────────── // ── SrcLayerData — opened source matrix for pass-2 lookup ─────────────────────
pub(crate) enum SrcLayerData { pub(crate) enum SrcLayerData {
@@ -66,18 +43,18 @@ impl SrcLayerData {
match merge_mode { match merge_mode {
MergeMode::Presence => { MergeMode::Presence => {
if counts_dir.exists() && !layer_dir.join("presence").exists() { if counts_dir.exists() && !layer_dir.join("presence").exists() {
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?;
let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Count(mphf, mat)) Ok(SrcLayerData::Count(mphf, mat))
} else { } else {
// presence dir exists, or neither exists → Implicit handled by open() // presence dir exists, or neither exists → Implicit handled by open()
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?;
let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Presence(mphf, mat)) Ok(SrcLayerData::Presence(mphf, mat))
} }
} }
MergeMode::Count => { MergeMode::Count => {
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?;
if counts_dir.exists() { if counts_dir.exists() {
let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Count(mphf, mat)) Ok(SrcLayerData::Count(mphf, mat))
@@ -107,53 +84,6 @@ impl SrcLayerData {
const INDEX_SUBDIR: &str = "index"; const INDEX_SUBDIR: &str = "index";
/// Load PartitionMeta, or recover it by probing layer directories.
/// Indexes built before meta.json was introduced lack the file.
fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
match PartitionMeta::load(dir) {
Ok(m) => Ok(m),
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
{
let mut n = 0usize;
while dir.join(format!("layer_{n}")).exists() {
n += 1;
}
let m = PartitionMeta {
n_layers: n,
mode: IndexMode::default(),
};
m.save(dir).map_err(olm_to_sk)?;
Ok(m)
}
Err(e) => Err(olm_to_sk(e)),
}
}
fn olm_to_sk(e: OLMError) -> SKError {
match e {
OLMError::Io(e) => SKError::Io(e),
other => SKError::InvalidData {
context: "merge",
detail: other.to_string(),
},
}
}
fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pbiv"))
}
fn col_path_int(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pciv"))
}
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
fs::write(
dir.join("meta.json"),
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
)
}
// ── KmerPartition::merge_partition ──────────────────────────────────────────── // ── KmerPartition::merge_partition ────────────────────────────────────────────
impl KmerPartition { impl KmerPartition {
@@ -180,8 +110,8 @@ impl KmerPartition {
return Ok(0); return Ok(0);
} }
load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open load_meta(&dst_index_dir, "merge")?; // ensure meta.json exists before LayeredMap::open
let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?); let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?);
let n_dst_layers = dst_map.n_layers(); let n_dst_layers = dst_map.n_layers();
let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum(); let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum();
@@ -191,7 +121,7 @@ impl KmerPartition {
for l in 0..n_dst_layers { for l in 0..n_dst_layers {
let layer_dir = dst_index_dir.join(format!("layer_{l}")); let layer_dir = dst_index_dir.join(format!("layer_{l}"));
Layer::<()>::init_presence_matrix(&layer_dir, dst_map.layer(l).n()) Layer::<()>::init_presence_matrix(&layer_dir, dst_map.layer(l).n())
.map_err(olm_to_sk)?; .map_err(|e| olm_to_sk(e, "merge"))?;
} }
} }
@@ -209,7 +139,7 @@ impl KmerPartition {
if !src_index_dir.exists() { if !src_index_dir.exists() {
continue; continue;
} }
let src_meta = load_meta(&src_index_dir)?; let src_meta = load_meta(&src_index_dir, "merge")?;
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let p = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin"); let p = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin");
if p.exists() { if p.exists() {
@@ -221,92 +151,36 @@ impl KmerPartition {
let n_src_layers = unitig_paths.len(); let n_src_layers = unitig_paths.len();
debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)"); debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)");
enum Pass1Data {
File((PathBuf, ThrottleGuard)),
Batch(Vec<CanonicalKmer>),
NewKmers(Vec<CanonicalKmer>),
}
const BATCH: usize = 4096; const BATCH: usize = 4096;
let n_workers = rayon::current_num_threads().min(16).max(4); let n_workers = rayon::current_num_threads().min(16).max(4);
let capacity = 2;
// At most 2 files open simultaneously: keeps n_workers-2 workers free // At most 2 files open simultaneously: keeps n_workers-2 workers free
// for the Transform stage. Each open file monopolises one worker for the // for the Transform stage. Each open file monopolises one worker for the
// full duration of its read, so this must stay well below n_workers. // full duration of its read, so this must stay well below n_workers.
let max_open = 2; let max_open = 2;
let dst_filter = Arc::clone(&dst_map); let dst_filter = Arc::clone(&dst_map);
let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new()));
let g_sink = Arc::clone(&g_shared);
let pass1_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let err_cap = Arc::clone(&pass1_err);
let throttled_paths = throttle(unitig_paths.into_iter(), max_open); let g = build_graph(
unitig_paths.into_iter(),
let pipeline = Pipeline::new( move |path: PathBuf, emit: &mut dyn FnMut(Vec<CanonicalKmer>)| -> SKResult<()> {
make_source!(Pass1Data, throttled_paths.map(|t| (t.item, t.guard)), File), let reader = UnitigFileReader::open_sequential(&path)?;
vec![ let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
Stage::Flat(Arc::new( for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
move |data: Pass1Data, batch.push(kmer);
push: &PipelineSender<Result<Pass1Data, PipelineError>>, if batch.len() == BATCH {
delta: &PipelineSender<isize>| emit(std::mem::replace(&mut batch, Vec::with_capacity(BATCH)));
{
if let Pass1Data::File((path, _guard)) = data {
// _guard is dropped at end of this block, releasing the slot.
let reader = match UnitigFileReader::open_sequential(&path) {
Ok(r) => r,
Err(e) => {
*err_cap.lock().unwrap() = Some(e.to_string());
delta.send(-1).ok();
return;
}
};
let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
let mut count: isize = 0;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
batch.push(kmer);
if batch.len() == BATCH {
let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH));
push.send(Ok(Pass1Data::Batch(b))).ok();
count += 1;
}
}
if !batch.is_empty() {
push.send(Ok(Pass1Data::Batch(batch))).ok();
count += 1;
}
delta.send(count - 1).ok();
}
}
) as SharedFlatFn<Pass1Data>),
make_transform!(Pass1Data, {
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
batch.into_iter()
.filter(|&k| dst_filter.query(k).is_none())
.collect()
}
}, Batch, NewKmers),
],
make_sink!(Pass1Data, {
move |batch: Vec<CanonicalKmer>| {
let mut g = g_sink.lock().unwrap();
for kmer in batch {
g.push(kmer);
} }
} }
}, NewKmers), if !batch.is_empty() {
); emit(batch);
}
Ok(())
},
move |kmer| dst_filter.query(kmer).is_none(),
n_workers,
max_open,
)?;
WorkerPool::new(pipeline, n_workers, capacity).run();
if let Some(msg) = Arc::try_unwrap(pass1_err).unwrap().into_inner().unwrap() {
return Err(SKError::InvalidData { context: "merge pass1", detail: msg });
}
let g = Arc::try_unwrap(g_shared)
.unwrap_or_else(|_| panic!("pass1: g_shared not uniquely owned after pipeline"))
.into_inner()
.unwrap_or_else(|e| e.into_inner());
let any_new = g.len() > 0; let any_new = g.len() > 0;
debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len()); debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len());
@@ -315,25 +189,8 @@ impl KmerPartition {
let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}"));
let n_new = if any_new { let n_new = if any_new {
let t_deg = std::time::Instant::now();
g.compute_degrees_and_mark_starts();
debug!("partition {i}: compute_degrees in {:.3}s — {} nodes",
t_deg.elapsed().as_secs_f64(), g.len());
fs::create_dir_all(&new_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?;
debug!("partition {i}: unitig traversal start — {} nodes", g.len()); debug!("partition {i}: unitig traversal start — {} nodes", g.len());
let mut n_unitigs = 0usize; let n_nodes = materialize_layer(g, &new_layer_dir, block_bits, evidence)?;
g.try_for_each_unitig(|unitig| {
n_unitigs += 1;
uw.write(unitig)
})?;
debug!("partition {i}: unitig writer closing");
uw.close()?;
let n_nodes = g.len();
debug!("partition {i}: unitig writer closed — dropping graph ({n_nodes} nodes)");
drop(g);
debug!("partition {i}: graph dropped — starting MPHF build ({n_unitigs} unitigs)");
Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
debug!("partition {i}: MPHF build done"); debug!("partition {i}: MPHF build done");
n_nodes n_nodes
} else { } else {
@@ -343,7 +200,7 @@ impl KmerPartition {
let t_open = std::time::Instant::now(); let t_open = std::time::Instant::now();
let new_mphf: Option<Arc<MphfOnly>> = if any_new { let new_mphf: Option<Arc<MphfOnly>> = if any_new {
Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?)) Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(|e| olm_to_sk(e, "merge"))?))
} else { } else {
None None
}; };
@@ -449,7 +306,7 @@ impl KmerPartition {
col_offset += src_n; col_offset += src_n;
continue; continue;
} }
let src_meta = load_meta(&src_index_dir)?; let src_meta = load_meta(&src_index_dir, "merge")?;
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
if src_layer_dir.join("unitigs.bin").exists() { if src_layer_dir.join("unitigs.bin").exists() {
@@ -483,6 +340,7 @@ impl KmerPartition {
let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None)); let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let err_cap2 = Arc::clone(&pass2_err); let err_cap2 = Arc::clone(&pass2_err);
let capacity = 2;
let throttled_pass2 = throttle(pass2_items.into_iter(), max_open); let throttled_pass2 = throttle(pass2_items.into_iter(), max_open);
let pipeline2 = Pipeline::new( let pipeline2 = Pipeline::new(
@@ -516,6 +374,7 @@ impl KmerPartition {
return; return;
} }
}; };
const BATCH: usize = 4096;
let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH); let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
let mut count: isize = 0; let mut count: isize = 0;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
@@ -617,9 +476,9 @@ impl KmerPartition {
write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total) write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total)
.map_err(SKError::Io)?; .map_err(SKError::Io)?;
let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?; let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?;
part_meta.n_layers = new_layer_idx + 1; part_meta.n_layers = new_layer_idx + 1;
part_meta.save(&dst_index_dir).map_err(olm_to_sk)?; part_meta.save(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?;
} }
debug!("partition {i}: builders closed in {:.3}s", t_close.elapsed().as_secs_f64()); debug!("partition {i}: builders closed in {:.3}s", t_close.elapsed().as_secs_f64());
+11 -91
View File
@@ -1,6 +1,4 @@
use std::fs; use std::path::Path;
use std::io;
use std::path::{Path, PathBuf};
use obicompactvec::{ use obicompactvec::{
PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder,
@@ -9,85 +7,17 @@ use obicompactvec::{
use obidebruinj::GraphDeBruijn; use obidebruinj::GraphDeBruijn;
use obikseq::CanonicalKmer; use obikseq::CanonicalKmer;
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError}; use obilayeredmap::{IndexMode, MphfLayer};
use obiskio::{SKError, SKResult, UnitigFileReader}; use obiskio::{SKError, SKResult, UnitigFileReader};
use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta};
use crate::filter::{KmerFilter, passes_all}; use crate::filter::{KmerFilter, passes_all};
use crate::graph_pipeline::materialize_layer;
use crate::merge_layer::{MergeMode, SrcLayerData}; use crate::merge_layer::{MergeMode, SrcLayerData};
use crate::partition::KmerPartition; use crate::partition::KmerPartition;
const INDEX_SUBDIR: &str = "index"; const INDEX_SUBDIR: &str = "index";
fn olm_to_sk(e: OLMError) -> SKError {
match e {
OLMError::Io(e) => SKError::Io(e),
other => SKError::InvalidData {
context: "rebuild",
detail: other.to_string(),
},
}
}
fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pbiv"))
}
fn col_path_int(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pciv"))
}
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
fs::write(
dir.join("meta.json"),
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
)
}
// ── ColBuilder ────────────────────────────────────────────────────────────────
enum ColBuilder {
Bit(PersistentBitVecBuilder),
Int(PersistentCompactIntVecBuilder),
}
impl ColBuilder {
fn set_val(&mut self, slot: usize, value: u32) {
match self {
ColBuilder::Bit(b) => b.set(slot, value > 0),
ColBuilder::Int(b) => b.set(slot, value),
}
}
fn close(self) -> SKResult<()> {
match self {
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
}
}
}
// ── Helpers ───────────────────────────────────────────────────────────────────
fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
match PartitionMeta::load(dir) {
Ok(m) => Ok(m),
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
{
let mut n = 0usize;
while dir.join(format!("layer_{n}")).exists() {
n += 1;
}
let m = PartitionMeta {
n_layers: n,
mode: IndexMode::default(),
};
m.save(dir).map_err(olm_to_sk)?;
Ok(m)
}
Err(e) => Err(olm_to_sk(e)),
}
}
/// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`. /// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`.
/// ///
/// Uses [`SrcLayerData`] semantics: counts take priority over presence when /// Uses [`SrcLayerData`] semantics: counts take priority over presence when
@@ -99,7 +29,7 @@ fn iter_src_layers(
filters: &[Box<dyn KmerFilter>], filters: &[Box<dyn KmerFilter>],
mut cb: impl FnMut(CanonicalKmer, Box<[u32]>), mut cb: impl FnMut(CanonicalKmer, Box<[u32]>),
) -> SKResult<()> { ) -> SKResult<()> {
let src_meta = load_meta(src_index_dir)?; let src_meta = load_meta(src_index_dir, "rebuild")?;
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
let unitigs_path = src_layer_dir.join("unitigs.bin"); let unitigs_path = src_layer_dir.join("unitigs.bin");
@@ -144,7 +74,7 @@ impl KmerPartition {
return Ok(()); return Ok(());
} }
let src_meta = load_meta(&src_index_dir)?; let src_meta = load_meta(&src_index_dir, "rebuild")?;
if src_meta.n_layers == 0 { if src_meta.n_layers == 0 {
return Ok(()); return Ok(());
} }
@@ -159,30 +89,20 @@ impl KmerPartition {
return Ok(()); return Ok(());
} }
let n_new = g.len();
g.compute_degrees_and_mark_starts();
// ── Build MPHF in dst layer_0 ───────────────────────────────────────── // ── Build MPHF in dst layer_0 ─────────────────────────────────────────
let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR);
let dst_layer_dir = dst_index_dir.join("layer_0"); let dst_layer_dir = dst_index_dir.join("layer_0");
fs::create_dir_all(&dst_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; let n_new = materialize_layer(g, &dst_layer_dir, block_bits, &IndexMode::Exact)?;
g.try_for_each_unitig(|unitig| { let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact)
uw.write(unitig) .map_err(|e| olm_to_sk(e, "rebuild"))?;
})?;
uw.close()?;
drop(g);
Layer::<()>::build(&dst_layer_dir, block_bits, &IndexMode::Exact).map_err(olm_to_sk)?;
let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact).map_err(olm_to_sk)?;
// ── Prepare matrix builders (one column per genome) ─────────────────── // ── Prepare matrix builders (one column per genome) ───────────────────
let data_dir = match mode { let data_dir = match mode {
MergeMode::Presence => dst_layer_dir.join("presence"), MergeMode::Presence => dst_layer_dir.join("presence"),
MergeMode::Count => dst_layer_dir.join("counts"), MergeMode::Count => dst_layer_dir.join("counts"),
}; };
fs::create_dir_all(&data_dir)?; std::fs::create_dir_all(&data_dir)?;
let mut builders: Vec<ColBuilder> = match mode { let mut builders: Vec<ColBuilder> = match mode {
MergeMode::Presence => { MergeMode::Presence => {
@@ -234,7 +154,7 @@ impl KmerPartition {
mode: IndexMode::Exact, mode: IndexMode::Exact,
} }
.save(&dst_index_dir) .save(&dst_index_dir)
.map_err(olm_to_sk)?; .map_err(|e| olm_to_sk(e, "rebuild"))?;
Ok(()) Ok(())
} }