diff --git a/src/obikpartitionner/src/common.rs b/src/obikpartitionner/src/common.rs new file mode 100644 index 0000000..99e345e --- /dev/null +++ b/src/obikpartitionner/src/common.rs @@ -0,0 +1,84 @@ +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +use obicompactvec::{PersistentBitVecBuilder, PersistentCompactIntVecBuilder}; +use obilayeredmap::meta::PartitionMeta; +use obilayeredmap::{IndexMode, OLMError}; +use obiskio::{SKError, SKResult}; + +// ── olm_to_sk ──────────────────────────────────────────────────────────────── + +pub(crate) fn olm_to_sk(e: OLMError, context: &'static str) -> SKError { + match e { + OLMError::Io(e) => SKError::Io(e), + other => SKError::InvalidData { + context, + detail: other.to_string(), + }, + } +} + +// ── load_meta ──────────────────────────────────────────────────────────────── + +/// Load PartitionMeta, or recover it by probing layer directories. +/// Indexes built before meta.json was introduced lack the file. +pub(crate) fn load_meta(dir: &Path, context: &'static str) -> SKResult { + match PartitionMeta::load(dir) { + Ok(m) => Ok(m), + Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => + { + let mut n = 0usize; + while dir.join(format!("layer_{n}")).exists() { + n += 1; + } + let m = PartitionMeta { + n_layers: n, + mode: IndexMode::default(), + }; + m.save(dir).map_err(|e| olm_to_sk(e, context))?; + Ok(m) + } + Err(e) => Err(olm_to_sk(e, context)), + } +} + +// ── path helpers ────────────────────────────────────────────────────────────── + +pub(crate) fn col_path_bit(dir: &Path, col: usize) -> PathBuf { + dir.join(format!("col_{col:06}.pbiv")) +} + +pub(crate) fn col_path_int(dir: &Path, col: usize) -> PathBuf { + dir.join(format!("col_{col:06}.pciv")) +} + +pub(crate) fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { + fs::write( + dir.join("meta.json"), + format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), + ) +} + +// ── ColBuilder ──────────────────────────────────────────────────────────────── + +pub(crate) enum ColBuilder { + Bit(PersistentBitVecBuilder), + Int(PersistentCompactIntVecBuilder), +} + +impl ColBuilder { + pub(crate) fn set_val(&mut self, slot: usize, value: u32) { + match self { + ColBuilder::Bit(b) => b.set(slot, value > 0), + ColBuilder::Int(b) => b.set(slot, value), + } + } + + pub(crate) fn close(self) -> SKResult<()> { + match self { + ColBuilder::Bit(b) => b.close().map_err(SKError::Io), + ColBuilder::Int(b) => b.close().map_err(SKError::Io), + } + } +} diff --git a/src/obikpartitionner/src/graph_pipeline.rs b/src/obikpartitionner/src/graph_pipeline.rs new file mode 100644 index 0000000..c1c8302 --- /dev/null +++ b/src/obikpartitionner/src/graph_pipeline.rs @@ -0,0 +1,152 @@ +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use tracing::debug; +use obipipeline::{ + Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool, + make_sink, make_source, make_transform, + throttle, +}; + +use obidebruinj::GraphDeBruijn; +use obikseq::CanonicalKmer; +use obilayeredmap::{IndexMode, Layer}; +use obiskio::{SKError, SKResult}; + +use crate::common::olm_to_sk; + +// ── KmerGraphData ───────────────────────────────────────────────────────────── + +enum KmerGraphData { + File(obipipeline::Throttled), + RawBatch(Vec), + FilteredBatch(Vec), +} + +// ── build_graph ─────────────────────────────────────────────────────────────── + +/// Phase 1: pipeline that reads files, filters kmers, pushes into a GraphDeBruijn. +/// +/// `flat_fn(path, emit)`: opens path, iterates kmers, calls `emit(batch)` for each batch. +/// `filter(kmer) -> bool`: secondary filter applied in the Transform stage. +pub(crate) fn build_graph( + file_source: I, + flat_fn: F, + filter: G, + n_workers: usize, + max_open: usize, +) -> SKResult +where + I: Iterator + Send + 'static, + F: Fn(PathBuf, &mut dyn FnMut(Vec)) -> SKResult<()> + Send + Sync + 'static, + G: Fn(CanonicalKmer) -> bool + Send + Sync + 'static, +{ + let capacity = 2; + + let flat_fn = Arc::new(flat_fn); + let filter = Arc::new(filter); + let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new())); + let g_sink = Arc::clone(&g_shared); + let err_cap: Arc>> = Arc::new(Mutex::new(None)); + let err_flat = Arc::clone(&err_cap); + + let throttled = throttle(file_source, max_open); + + let pipeline = Pipeline::new( + make_source!(KmerGraphData, throttled, File), + vec![ + Stage::Flat(Arc::new( + move |data: KmerGraphData, + push: &PipelineSender>, + delta: &PipelineSender| + { + if let KmerGraphData::File(t) = data { + let path = t.item; + let _guard = t.guard; // released at end of block + let mut count: isize = 0; + let push_clone = push.clone(); + let result = flat_fn(path, &mut |batch: Vec| { + push_clone.send(Ok(KmerGraphData::RawBatch(batch))).ok(); + count += 1; + }); + match result { + Ok(()) => { + delta.send(count - 1).ok(); + } + Err(e) => { + *err_flat.lock().unwrap() = Some(e); + delta.send(-1).ok(); + } + } + } + }, + ) as SharedFlatFn), + make_transform!(KmerGraphData, { + let filter = Arc::clone(&filter); + move |batch: Vec| -> Vec { + batch.into_iter().filter(|k| filter(*k)).collect() + } + }, RawBatch, FilteredBatch), + ], + make_sink!(KmerGraphData, { + move |batch: Vec| { + let mut g = g_sink.lock().unwrap(); + for kmer in batch { + g.push(kmer); + } + } + }, FilteredBatch), + ); + + WorkerPool::new(pipeline, n_workers, capacity).run(); + + if let Some(e) = Arc::try_unwrap(err_cap) + .unwrap_or_else(|_| panic!("build_graph: err_cap not uniquely owned after pipeline")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()) + { + return Err(e); + } + + let g = Arc::try_unwrap(g_shared) + .unwrap_or_else(|_| panic!("build_graph: g_shared not uniquely owned after pipeline")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()); + + Ok(g) +} + +// ── write_graph_as_unitigs ──────────────────────────────────────────────────── + +/// Phase 2 (write unitigs only): compute degrees, write unitigs to `layer_dir`, drop graph. +/// +/// Returns n_kmers. Does NOT build the MPHF — caller does it. +pub(crate) fn write_graph_as_unitigs(g: GraphDeBruijn, layer_dir: &Path) -> SKResult { + let n_kmers = g.len(); + g.compute_degrees_and_mark_starts(); + std::fs::create_dir_all(layer_dir)?; + let mut uw = Layer::<()>::unitig_writer(layer_dir).map_err(|e| olm_to_sk(e, "graph pipeline"))?; + g.try_for_each_unitig(|unitig| uw.write(unitig))?; + uw.close()?; + drop(g); + Ok(n_kmers) +} + +// ── materialize_layer ───────────────────────────────────────────────────────── + +/// Phase 2 (full): write_graph_as_unitigs + `Layer::<()>::build`. +/// +/// Returns n_kmers. +pub(crate) fn materialize_layer( + g: GraphDeBruijn, + layer_dir: &Path, + block_bits: u8, + evidence: &IndexMode, +) -> SKResult { + let n = write_graph_as_unitigs(g, layer_dir)?; + debug!("materialize_layer: unitigs written ({n} kmers), building MPHF"); + Layer::<()>::build(layer_dir, block_bits, evidence) + .map_err(|e| olm_to_sk(e, "graph pipeline"))?; + debug!("materialize_layer: MPHF build done"); + Ok(n) +} diff --git a/src/obikpartitionner/src/index_layer.rs b/src/obikpartitionner/src/index_layer.rs index 8eed6db..6b48df5 100644 --- a/src/obikpartitionner/src/index_layer.rs +++ b/src/obikpartitionner/src/index_layer.rs @@ -6,24 +6,16 @@ use epserde::prelude::*; use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; use obidebruinj::GraphDeBruijn; use obilayeredmap::meta::PartitionMeta; -use obilayeredmap::{IndexMode, OLMError, layer::Layer}; +use obilayeredmap::{IndexMode, layer::Layer}; use obiskio::{SKError, SKFileMeta, SKFileReader}; use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; +use crate::common::olm_to_sk; +use crate::graph_pipeline::{materialize_layer, write_graph_as_unitigs}; use crate::partition::KmerPartition; type Mphf = PtrHash>, Xx64, Vec>; -fn olm_to_sk(e: OLMError) -> SKError { - match e { - OLMError::Io(io_err) => SKError::Io(io_err), - other => SKError::InvalidData { - context: "layer build", - detail: other.to_string(), - }, - } -} - fn remove_if_exists(path: &std::path::Path) { if let Err(e) = fs::remove_file(path) { if e.kind() != io::ErrorKind::NotFound { @@ -101,18 +93,8 @@ impl KmerPartition { } } - let n_kmers = g.len(); - g.compute_degrees_and_mark_starts(); - - fs::create_dir_all(&layer_dir)?; - - let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?; - g.try_for_each_unitig(|unitig| { - uw.write(unitig) - })?; - uw.close()?; - - if with_counts { + let n_kmers = if with_counts { + let n = write_graph_as_unitigs(g, &layer_dir)?; Layer::::build( &layer_dir, block_bits, @@ -122,10 +104,11 @@ impl KmerPartition { _ => 1, }, ) - .map_err(olm_to_sk)?; + .map_err(|e| olm_to_sk(e, "layer build"))?; + n } else { - Layer::<()>::build(&layer_dir, block_bits, mode).map_err(olm_to_sk)?; - } + materialize_layer(g, &layer_dir, block_bits, mode)? + }; let index_dir = layer_dir.parent().expect("layer_dir has a parent"); PartitionMeta { @@ -133,7 +116,7 @@ impl KmerPartition { mode: mode.clone(), } .save(index_dir) - .map_err(olm_to_sk)?; + .map_err(|e| olm_to_sk(e, "layer build"))?; Ok(n_kmers) } diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index fe09264..a63e2c9 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -1,6 +1,8 @@ pub mod filter; +mod common; mod distance; mod dump_layer; +mod graph_pipeline; mod index_layer; mod kmer_sort; mod merge_layer; diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index d56e71a..0701b6d 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -1,5 +1,4 @@ use std::fs; -use std::io; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; @@ -14,12 +13,13 @@ use obicompactvec::{ PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder, }; -use obidebruinj::GraphDeBruijn; use obikseq::CanonicalKmer; use obilayeredmap::meta::PartitionMeta; -use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError}; +use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly}; use obiskio::{SKError, SKResult, UnitigFileReader}; +use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta}; +use crate::graph_pipeline::{build_graph, materialize_layer}; use crate::partition::KmerPartition; // ── MergeMode ───────────────────────────────────────────────────────────────── @@ -30,29 +30,6 @@ pub enum MergeMode { Count, } -// ── ColBuilder — enum dispatch to avoid trait-object boxing issues ───────────── - -enum ColBuilder { - Bit(PersistentBitVecBuilder), - Int(PersistentCompactIntVecBuilder), -} - -impl ColBuilder { - fn set_val(&mut self, slot: usize, value: u32) { - match self { - ColBuilder::Bit(b) => b.set(slot, value > 0), - ColBuilder::Int(b) => b.set(slot, value), - } - } - - fn close(self) -> SKResult<()> { - match self { - ColBuilder::Bit(b) => b.close().map_err(SKError::Io), - ColBuilder::Int(b) => b.close().map_err(SKError::Io), - } - } -} - // ── SrcLayerData — opened source matrix for pass-2 lookup ───────────────────── pub(crate) enum SrcLayerData { @@ -66,18 +43,18 @@ impl SrcLayerData { match merge_mode { MergeMode::Presence => { if counts_dir.exists() && !layer_dir.join("presence").exists() { - let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; + let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?; let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Count(mphf, mat)) } else { // presence dir exists, or neither exists → Implicit handled by open() - let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; + let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?; let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Presence(mphf, mat)) } } MergeMode::Count => { - let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; + let mphf = MphfOnly::open(layer_dir).map_err(|e| olm_to_sk(e, "merge"))?; if counts_dir.exists() { let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Count(mphf, mat)) @@ -107,53 +84,6 @@ impl SrcLayerData { const INDEX_SUBDIR: &str = "index"; -/// Load PartitionMeta, or recover it by probing layer directories. -/// Indexes built before meta.json was introduced lack the file. -fn load_meta(dir: &Path) -> SKResult { - match PartitionMeta::load(dir) { - Ok(m) => Ok(m), - Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => - { - let mut n = 0usize; - while dir.join(format!("layer_{n}")).exists() { - n += 1; - } - let m = PartitionMeta { - n_layers: n, - mode: IndexMode::default(), - }; - m.save(dir).map_err(olm_to_sk)?; - Ok(m) - } - Err(e) => Err(olm_to_sk(e)), - } -} - -fn olm_to_sk(e: OLMError) -> SKError { - match e { - OLMError::Io(e) => SKError::Io(e), - other => SKError::InvalidData { - context: "merge", - detail: other.to_string(), - }, - } -} - -fn col_path_bit(dir: &Path, col: usize) -> PathBuf { - dir.join(format!("col_{col:06}.pbiv")) -} - -fn col_path_int(dir: &Path, col: usize) -> PathBuf { - dir.join(format!("col_{col:06}.pciv")) -} - -fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { - fs::write( - dir.join("meta.json"), - format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), - ) -} - // ── KmerPartition::merge_partition ──────────────────────────────────────────── impl KmerPartition { @@ -180,8 +110,8 @@ impl KmerPartition { return Ok(0); } - load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open - let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?); + load_meta(&dst_index_dir, "merge")?; // ensure meta.json exists before LayeredMap::open + let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?); let n_dst_layers = dst_map.n_layers(); let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum(); @@ -191,7 +121,7 @@ impl KmerPartition { for l in 0..n_dst_layers { let layer_dir = dst_index_dir.join(format!("layer_{l}")); Layer::<()>::init_presence_matrix(&layer_dir, dst_map.layer(l).n()) - .map_err(olm_to_sk)?; + .map_err(|e| olm_to_sk(e, "merge"))?; } } @@ -209,7 +139,7 @@ impl KmerPartition { if !src_index_dir.exists() { continue; } - let src_meta = load_meta(&src_index_dir)?; + let src_meta = load_meta(&src_index_dir, "merge")?; for l in 0..src_meta.n_layers { let p = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin"); if p.exists() { @@ -221,92 +151,36 @@ impl KmerPartition { let n_src_layers = unitig_paths.len(); debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)"); - enum Pass1Data { - File((PathBuf, ThrottleGuard)), - Batch(Vec), - NewKmers(Vec), - } - const BATCH: usize = 4096; let n_workers = rayon::current_num_threads().min(16).max(4); - let capacity = 2; // At most 2 files open simultaneously: keeps n_workers-2 workers free // for the Transform stage. Each open file monopolises one worker for the // full duration of its read, so this must stay well below n_workers. - let max_open = 2; + let max_open = 2; let dst_filter = Arc::clone(&dst_map); - let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new())); - let g_sink = Arc::clone(&g_shared); - let pass1_err: Arc>> = Arc::new(Mutex::new(None)); - let err_cap = Arc::clone(&pass1_err); - let throttled_paths = throttle(unitig_paths.into_iter(), max_open); - - let pipeline = Pipeline::new( - make_source!(Pass1Data, throttled_paths.map(|t| (t.item, t.guard)), File), - vec![ - Stage::Flat(Arc::new( - move |data: Pass1Data, - push: &PipelineSender>, - delta: &PipelineSender| - { - if let Pass1Data::File((path, _guard)) = data { - // _guard is dropped at end of this block, releasing the slot. - let reader = match UnitigFileReader::open_sequential(&path) { - Ok(r) => r, - Err(e) => { - *err_cap.lock().unwrap() = Some(e.to_string()); - delta.send(-1).ok(); - return; - } - }; - let mut batch: Vec = Vec::with_capacity(BATCH); - let mut count: isize = 0; - for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - batch.push(kmer); - if batch.len() == BATCH { - let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH)); - push.send(Ok(Pass1Data::Batch(b))).ok(); - count += 1; - } - } - if !batch.is_empty() { - push.send(Ok(Pass1Data::Batch(batch))).ok(); - count += 1; - } - delta.send(count - 1).ok(); - } - } - ) as SharedFlatFn), - make_transform!(Pass1Data, { - move |batch: Vec| -> Vec { - batch.into_iter() - .filter(|&k| dst_filter.query(k).is_none()) - .collect() - } - }, Batch, NewKmers), - ], - make_sink!(Pass1Data, { - move |batch: Vec| { - let mut g = g_sink.lock().unwrap(); - for kmer in batch { - g.push(kmer); + let g = build_graph( + unitig_paths.into_iter(), + move |path: PathBuf, emit: &mut dyn FnMut(Vec)| -> SKResult<()> { + let reader = UnitigFileReader::open_sequential(&path)?; + let mut batch: Vec = Vec::with_capacity(BATCH); + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + batch.push(kmer); + if batch.len() == BATCH { + emit(std::mem::replace(&mut batch, Vec::with_capacity(BATCH))); } } - }, NewKmers), - ); + if !batch.is_empty() { + emit(batch); + } + Ok(()) + }, + move |kmer| dst_filter.query(kmer).is_none(), + n_workers, + max_open, + )?; - WorkerPool::new(pipeline, n_workers, capacity).run(); - - if let Some(msg) = Arc::try_unwrap(pass1_err).unwrap().into_inner().unwrap() { - return Err(SKError::InvalidData { context: "merge pass1", detail: msg }); - } - - let g = Arc::try_unwrap(g_shared) - .unwrap_or_else(|_| panic!("pass1: g_shared not uniquely owned after pipeline")) - .into_inner() - .unwrap_or_else(|e| e.into_inner()); let any_new = g.len() > 0; debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len()); @@ -315,25 +189,8 @@ impl KmerPartition { let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); let n_new = if any_new { - let t_deg = std::time::Instant::now(); - g.compute_degrees_and_mark_starts(); - debug!("partition {i}: compute_degrees in {:.3}s — {} nodes", - t_deg.elapsed().as_secs_f64(), g.len()); - fs::create_dir_all(&new_layer_dir)?; - let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; debug!("partition {i}: unitig traversal start — {} nodes", g.len()); - let mut n_unitigs = 0usize; - g.try_for_each_unitig(|unitig| { - n_unitigs += 1; - uw.write(unitig) - })?; - debug!("partition {i}: unitig writer closing"); - uw.close()?; - let n_nodes = g.len(); - debug!("partition {i}: unitig writer closed — dropping graph ({n_nodes} nodes)"); - drop(g); - debug!("partition {i}: graph dropped — starting MPHF build ({n_unitigs} unitigs)"); - Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; + let n_nodes = materialize_layer(g, &new_layer_dir, block_bits, evidence)?; debug!("partition {i}: MPHF build done"); n_nodes } else { @@ -343,7 +200,7 @@ impl KmerPartition { let t_open = std::time::Instant::now(); let new_mphf: Option> = if any_new { - Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?)) + Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(|e| olm_to_sk(e, "merge"))?)) } else { None }; @@ -449,7 +306,7 @@ impl KmerPartition { col_offset += src_n; continue; } - let src_meta = load_meta(&src_index_dir)?; + let src_meta = load_meta(&src_index_dir, "merge")?; for l in 0..src_meta.n_layers { let src_layer_dir = src_index_dir.join(format!("layer_{l}")); if src_layer_dir.join("unitigs.bin").exists() { @@ -483,6 +340,7 @@ impl KmerPartition { let pass2_err: Arc>> = Arc::new(Mutex::new(None)); let err_cap2 = Arc::clone(&pass2_err); + let capacity = 2; let throttled_pass2 = throttle(pass2_items.into_iter(), max_open); let pipeline2 = Pipeline::new( @@ -516,6 +374,7 @@ impl KmerPartition { return; } }; + const BATCH: usize = 4096; let mut batch: Vec = Vec::with_capacity(BATCH); let mut count: isize = 0; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { @@ -617,9 +476,9 @@ impl KmerPartition { write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total) .map_err(SKError::Io)?; - let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?; + let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?; part_meta.n_layers = new_layer_idx + 1; - part_meta.save(&dst_index_dir).map_err(olm_to_sk)?; + part_meta.save(&dst_index_dir).map_err(|e| olm_to_sk(e, "merge"))?; } debug!("partition {i}: builders closed in {:.3}s", t_close.elapsed().as_secs_f64()); diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index ea09536..6bd40f3 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -1,6 +1,4 @@ -use std::fs; -use std::io; -use std::path::{Path, PathBuf}; +use std::path::Path; use obicompactvec::{ PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder, @@ -9,85 +7,17 @@ use obicompactvec::{ use obidebruinj::GraphDeBruijn; use obikseq::CanonicalKmer; use obilayeredmap::meta::PartitionMeta; -use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError}; +use obilayeredmap::{IndexMode, MphfLayer}; use obiskio::{SKError, SKResult, UnitigFileReader}; +use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta}; use crate::filter::{KmerFilter, passes_all}; +use crate::graph_pipeline::materialize_layer; use crate::merge_layer::{MergeMode, SrcLayerData}; use crate::partition::KmerPartition; const INDEX_SUBDIR: &str = "index"; -fn olm_to_sk(e: OLMError) -> SKError { - match e { - OLMError::Io(e) => SKError::Io(e), - other => SKError::InvalidData { - context: "rebuild", - detail: other.to_string(), - }, - } -} - -fn col_path_bit(dir: &Path, col: usize) -> PathBuf { - dir.join(format!("col_{col:06}.pbiv")) -} - -fn col_path_int(dir: &Path, col: usize) -> PathBuf { - dir.join(format!("col_{col:06}.pciv")) -} - -fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { - fs::write( - dir.join("meta.json"), - format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), - ) -} - -// ── ColBuilder ──────────────────────────────────────────────────────────────── - -enum ColBuilder { - Bit(PersistentBitVecBuilder), - Int(PersistentCompactIntVecBuilder), -} - -impl ColBuilder { - fn set_val(&mut self, slot: usize, value: u32) { - match self { - ColBuilder::Bit(b) => b.set(slot, value > 0), - ColBuilder::Int(b) => b.set(slot, value), - } - } - - fn close(self) -> SKResult<()> { - match self { - ColBuilder::Bit(b) => b.close().map_err(SKError::Io), - ColBuilder::Int(b) => b.close().map_err(SKError::Io), - } - } -} - -// ── Helpers ─────────────────────────────────────────────────────────────────── - -fn load_meta(dir: &Path) -> SKResult { - match PartitionMeta::load(dir) { - Ok(m) => Ok(m), - Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => - { - let mut n = 0usize; - while dir.join(format!("layer_{n}")).exists() { - n += 1; - } - let m = PartitionMeta { - n_layers: n, - mode: IndexMode::default(), - }; - m.save(dir).map_err(olm_to_sk)?; - Ok(m) - } - Err(e) => Err(olm_to_sk(e)), - } -} - /// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`. /// /// Uses [`SrcLayerData`] semantics: counts take priority over presence when @@ -99,7 +29,7 @@ fn iter_src_layers( filters: &[Box], mut cb: impl FnMut(CanonicalKmer, Box<[u32]>), ) -> SKResult<()> { - let src_meta = load_meta(src_index_dir)?; + let src_meta = load_meta(src_index_dir, "rebuild")?; for l in 0..src_meta.n_layers { let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let unitigs_path = src_layer_dir.join("unitigs.bin"); @@ -144,7 +74,7 @@ impl KmerPartition { return Ok(()); } - let src_meta = load_meta(&src_index_dir)?; + let src_meta = load_meta(&src_index_dir, "rebuild")?; if src_meta.n_layers == 0 { return Ok(()); } @@ -159,30 +89,20 @@ impl KmerPartition { return Ok(()); } - let n_new = g.len(); - g.compute_degrees_and_mark_starts(); - // ── Build MPHF in dst layer_0 ───────────────────────────────────────── let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); let dst_layer_dir = dst_index_dir.join("layer_0"); - fs::create_dir_all(&dst_layer_dir)?; - let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; - g.try_for_each_unitig(|unitig| { - uw.write(unitig) - })?; - uw.close()?; - drop(g); - - Layer::<()>::build(&dst_layer_dir, block_bits, &IndexMode::Exact).map_err(olm_to_sk)?; - let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact).map_err(olm_to_sk)?; + let n_new = materialize_layer(g, &dst_layer_dir, block_bits, &IndexMode::Exact)?; + let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact) + .map_err(|e| olm_to_sk(e, "rebuild"))?; // ── Prepare matrix builders (one column per genome) ─────────────────── let data_dir = match mode { MergeMode::Presence => dst_layer_dir.join("presence"), MergeMode::Count => dst_layer_dir.join("counts"), }; - fs::create_dir_all(&data_dir)?; + std::fs::create_dir_all(&data_dir)?; let mut builders: Vec = match mode { MergeMode::Presence => { @@ -234,7 +154,7 @@ impl KmerPartition { mode: IndexMode::Exact, } .save(&dst_index_dir) - .map_err(olm_to_sk)?; + .map_err(|e| olm_to_sk(e, "rebuild"))?; Ok(()) }