5c2f48535f
Renames `compute_degrees` to `compute_degrees_and_mark_starts` across the De Bruijn graph and partitioner layers to consolidate degree calculation and start-node flagging. Introduces safe neighbor iteration methods and a debug validation block to verify graph consistency. Refactors unitig extraction to use sequential execution with a `Mutex` for safe error propagation. Fixes malformed and duplicated method calls, adds auto-generation of missing `meta.json` files, and ensures persistent matrix builders are explicitly closed to finalize metadata.
243 lines
8.5 KiB
Rust
243 lines
8.5 KiB
Rust
use std::fs;
|
|
use std::io;
|
|
use std::path::{Path, PathBuf};
|
|
|
|
use obicompactvec::{
|
|
PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder,
|
|
PersistentCompactIntVecBuilder,
|
|
};
|
|
use obidebruinj::GraphDeBruijn;
|
|
use obikseq::CanonicalKmer;
|
|
use obilayeredmap::meta::PartitionMeta;
|
|
use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError};
|
|
use obiskio::{SKError, SKResult, UnitigFileReader};
|
|
|
|
use crate::filter::{KmerFilter, passes_all};
|
|
use crate::merge_layer::{MergeMode, SrcLayerData};
|
|
use crate::partition::KmerPartition;
|
|
|
|
const INDEX_SUBDIR: &str = "index";
|
|
|
|
fn olm_to_sk(e: OLMError) -> SKError {
|
|
match e {
|
|
OLMError::Io(e) => SKError::Io(e),
|
|
other => SKError::InvalidData {
|
|
context: "rebuild",
|
|
detail: other.to_string(),
|
|
},
|
|
}
|
|
}
|
|
|
|
fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
|
|
dir.join(format!("col_{col:06}.pbiv"))
|
|
}
|
|
|
|
fn col_path_int(dir: &Path, col: usize) -> PathBuf {
|
|
dir.join(format!("col_{col:06}.pciv"))
|
|
}
|
|
|
|
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
|
|
fs::write(
|
|
dir.join("meta.json"),
|
|
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
|
|
)
|
|
}
|
|
|
|
// ── ColBuilder ────────────────────────────────────────────────────────────────
|
|
|
|
enum ColBuilder {
|
|
Bit(PersistentBitVecBuilder),
|
|
Int(PersistentCompactIntVecBuilder),
|
|
}
|
|
|
|
impl ColBuilder {
|
|
fn set_val(&mut self, slot: usize, value: u32) {
|
|
match self {
|
|
ColBuilder::Bit(b) => b.set(slot, value > 0),
|
|
ColBuilder::Int(b) => b.set(slot, value),
|
|
}
|
|
}
|
|
|
|
fn close(self) -> SKResult<()> {
|
|
match self {
|
|
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
|
|
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
|
|
match PartitionMeta::load(dir) {
|
|
Ok(m) => Ok(m),
|
|
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
|
|
{
|
|
let mut n = 0usize;
|
|
while dir.join(format!("layer_{n}")).exists() {
|
|
n += 1;
|
|
}
|
|
let m = PartitionMeta {
|
|
n_layers: n,
|
|
mode: IndexMode::default(),
|
|
};
|
|
m.save(dir).map_err(olm_to_sk)?;
|
|
Ok(m)
|
|
}
|
|
Err(e) => Err(olm_to_sk(e)),
|
|
}
|
|
}
|
|
|
|
/// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`.
|
|
///
|
|
/// Uses [`SrcLayerData`] semantics: counts take priority over presence when
|
|
/// `mode = Count`; presence (or implicit all-ones) is used for `Presence`.
|
|
fn iter_src_layers(
|
|
src_index_dir: &Path,
|
|
mode: MergeMode,
|
|
n_genomes: usize,
|
|
filters: &[Box<dyn KmerFilter>],
|
|
mut cb: impl FnMut(CanonicalKmer, Box<[u32]>),
|
|
) -> SKResult<()> {
|
|
let src_meta = load_meta(src_index_dir)?;
|
|
for l in 0..src_meta.n_layers {
|
|
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
|
|
let unitigs_path = src_layer_dir.join("unitigs.bin");
|
|
if !unitigs_path.exists() {
|
|
continue;
|
|
}
|
|
|
|
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
|
|
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
|
|
|
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
|
let row = src_data.lookup(kmer, n_genomes);
|
|
if passes_all(filters, &row, n_genomes) {
|
|
cb(kmer, row.into_boxed_slice());
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
// ── KmerPartition::rebuild_partition ─────────────────────────────────────────
|
|
|
|
impl KmerPartition {
|
|
/// Rebuild partition `i` from `src` into `self` (an empty destination partition).
|
|
///
|
|
/// Only k-mers whose per-genome row passes all `filters` are written.
|
|
/// The output is a single-layer index — regardless of how many layers the
|
|
/// source has.
|
|
///
|
|
/// `n_genomes` is the number of genome columns in the source (and destination).
|
|
pub fn rebuild_partition(
|
|
&self,
|
|
src: &KmerPartition,
|
|
i: usize,
|
|
filters: &[Box<dyn KmerFilter>],
|
|
mode: MergeMode,
|
|
n_genomes: usize,
|
|
block_bits: u8,
|
|
) -> SKResult<()> {
|
|
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
|
|
if !src_index_dir.exists() {
|
|
return Ok(());
|
|
}
|
|
|
|
let src_meta = load_meta(&src_index_dir)?;
|
|
if src_meta.n_layers == 0 {
|
|
return Ok(());
|
|
}
|
|
|
|
// ── Pass 1: collect filtered kmers into de Bruijn graph ───────────────
|
|
let mut g = GraphDeBruijn::new();
|
|
iter_src_layers(&src_index_dir, mode, n_genomes, filters, |kmer, _row| {
|
|
g.push(kmer);
|
|
})?;
|
|
|
|
if g.len() == 0 {
|
|
return Ok(());
|
|
}
|
|
|
|
let n_new = g.len();
|
|
g.compute_degrees_and_mark_starts();
|
|
|
|
// ── Build MPHF in dst layer_0 ─────────────────────────────────────────
|
|
let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR);
|
|
let dst_layer_dir = dst_index_dir.join("layer_0");
|
|
fs::create_dir_all(&dst_layer_dir)?;
|
|
|
|
let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?;
|
|
g.try_for_each_unitig(|nuc_iter| {
|
|
let unitig: obikseq::unitig::Unitig = nuc_iter.collect();
|
|
uw.write(&unitig)
|
|
})?;
|
|
uw.close()?;
|
|
drop(g);
|
|
|
|
Layer::<()>::build(&dst_layer_dir, block_bits, &IndexMode::Exact).map_err(olm_to_sk)?;
|
|
let dst_mphf = MphfLayer::open(&dst_layer_dir, &IndexMode::Exact).map_err(olm_to_sk)?;
|
|
|
|
// ── Prepare matrix builders (one column per genome) ───────────────────
|
|
let data_dir = match mode {
|
|
MergeMode::Presence => dst_layer_dir.join("presence"),
|
|
MergeMode::Count => dst_layer_dir.join("counts"),
|
|
};
|
|
fs::create_dir_all(&data_dir)?;
|
|
|
|
let mut builders: Vec<ColBuilder> = match mode {
|
|
MergeMode::Presence => {
|
|
PersistentBitMatrixBuilder::new(n_new, &data_dir)
|
|
.map_err(SKError::Io)?
|
|
.close()
|
|
.map_err(SKError::Io)?;
|
|
(0..n_genomes)
|
|
.map(|g| -> SKResult<ColBuilder> {
|
|
let b = PersistentBitVecBuilder::new(n_new, &col_path_bit(&data_dir, g))?;
|
|
Ok(ColBuilder::Bit(b))
|
|
})
|
|
.collect::<SKResult<_>>()?
|
|
}
|
|
MergeMode::Count => {
|
|
PersistentCompactIntMatrixBuilder::new(n_new, &data_dir)
|
|
.map_err(SKError::Io)?
|
|
.close()
|
|
.map_err(SKError::Io)?;
|
|
(0..n_genomes)
|
|
.map(|g| -> SKResult<ColBuilder> {
|
|
let b = PersistentCompactIntVecBuilder::new(
|
|
n_new,
|
|
&col_path_int(&data_dir, g),
|
|
)?;
|
|
Ok(ColBuilder::Int(b))
|
|
})
|
|
.collect::<SKResult<_>>()?
|
|
}
|
|
};
|
|
|
|
// ── Pass 2: fill builders ─────────────────────────────────────────────
|
|
iter_src_layers(&src_index_dir, mode, n_genomes, filters, |kmer, row| {
|
|
if let Some(slot) = dst_mphf.find(kmer) {
|
|
for (col, &value) in row.iter().enumerate() {
|
|
builders[col].set_val(slot, value);
|
|
}
|
|
}
|
|
})?;
|
|
|
|
// ── Close builders, write metadata ────────────────────────────────────
|
|
for b in builders {
|
|
b.close()?;
|
|
}
|
|
write_matrix_meta(&data_dir, n_new, n_genomes).map_err(SKError::Io)?;
|
|
|
|
PartitionMeta {
|
|
n_layers: 1,
|
|
mode: IndexMode::Exact,
|
|
}
|
|
.save(&dst_index_dir)
|
|
.map_err(olm_to_sk)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|