Merge pull request 'Push ywwwypqxrtmy' (#14) from push-ywwwypqxrtmy into main

Reviewed-on: #14
This commit was merged in pull request #14.
This commit is contained in:
2026-06-03 13:18:41 +00:00
34 changed files with 1265 additions and 454 deletions
+2
View File
@@ -1507,6 +1507,7 @@ dependencies = [
"ndarray", "ndarray",
"obicompactvec", "obicompactvec",
"obikpartitionner", "obikpartitionner",
"obikseq",
"obilayeredmap", "obilayeredmap",
"obiskio", "obiskio",
"obisys", "obisys",
@@ -1659,6 +1660,7 @@ name = "obisys"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"libc", "libc",
"sysinfo",
] ]
[[package]] [[package]]
+240 -47
View File
@@ -1,22 +1,29 @@
use std::{fs, io, path::{Path, PathBuf}}; use std::fs::{self, File};
use std::io::{self, Write as _};
use std::path::{Path, PathBuf};
use memmap2::Mmap;
use ndarray::{Array1, Array2}; use ndarray::{Array1, Array2};
use rayon::prelude::*; use rayon::prelude::*;
use crate::bitvec::{PersistentBitVec, PersistentBitVecBuilder}; use crate::bitvec::{PersistentBitVec, PersistentBitVecBuilder};
use crate::layer_meta::LayerMeta;
use crate::meta::MatrixMeta; use crate::meta::MatrixMeta;
fn col_path(dir: &Path, col: usize) -> PathBuf { fn col_path(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pbiv")) dir.join(format!("col_{col:06}.pbiv"))
} }
pub struct PersistentBitMatrix { // ── ColumnarBitMatrix ─────────────────────────────────────────────────────────
/// Per-column file layout (original format).
pub struct ColumnarBitMatrix {
cols: Vec<PersistentBitVec>, cols: Vec<PersistentBitVec>,
n: usize, n: usize,
} }
impl PersistentBitMatrix { impl ColumnarBitMatrix {
pub fn open(dir: &Path) -> io::Result<Self> { pub(crate) fn open(dir: &Path) -> io::Result<Self> {
let meta = MatrixMeta::load(dir)?; let meta = MatrixMeta::load(dir)?;
let cols = (0..meta.n_cols) let cols = (0..meta.n_cols)
.map(|c| PersistentBitVec::open(&col_path(dir, c))) .map(|c| PersistentBitVec::open(&col_path(dir, c)))
@@ -24,16 +31,21 @@ impl PersistentBitMatrix {
Ok(Self { cols, n: meta.n }) Ok(Self { cols, n: meta.n })
} }
pub fn n(&self) -> usize { self.n } pub(crate) fn n(&self) -> usize { self.n }
pub fn n_cols(&self) -> usize { self.cols.len() } pub(crate) fn n_cols(&self) -> usize { self.cols.len() }
pub fn col(&self, c: usize) -> &PersistentBitVec { &self.cols[c] } pub(crate) fn col(&self, c: usize) -> &PersistentBitVec { &self.cols[c] }
pub fn row(&self, slot: usize) -> Box<[bool]> { pub(crate) fn row(&self, slot: usize) -> Box<[bool]> {
self.cols.iter().map(|c| c.get(slot)).collect() self.cols.iter().map(|c| c.get(slot)).collect()
} }
/// Returns the number of set bits in each column as `Array1<u64>`. pub(crate) fn fill_row(&self, slot: usize, buf: &mut [u32]) {
pub fn count_ones(&self) -> Array1<u64> { for (c, col) in self.cols.iter().enumerate() {
buf[c] = col.get(slot) as u32;
}
}
pub(crate) fn count_ones(&self) -> Array1<u64> {
let counts: Vec<u64> = (0..self.n_cols()) let counts: Vec<u64> = (0..self.n_cols())
.into_par_iter() .into_par_iter()
.map(|c| self.col(c).count_ones()) .map(|c| self.col(c).count_ones())
@@ -41,21 +53,15 @@ impl PersistentBitMatrix {
Array1::from_vec(counts) Array1::from_vec(counts)
} }
// ── Distance matrices ───────────────────────────────────────────────────── pub(crate) fn jaccard_dist_matrix(&self) -> Array2<f64> {
pub fn jaccard_dist_matrix(&self) -> Array2<f64> {
self.pairwise_f64(|i, j| self.col(i).jaccard_dist(self.col(j))) self.pairwise_f64(|i, j| self.col(i).jaccard_dist(self.col(j)))
} }
pub fn hamming_dist_matrix(&self) -> Array2<u64> { pub(crate) fn hamming_dist_matrix(&self) -> Array2<u64> {
self.pairwise_u64(|i, j| self.col(i).hamming_dist(self.col(j))) self.pairwise_u64(|i, j| self.col(i).hamming_dist(self.col(j)))
} }
// ── Partial matrices (additively decomposable across layers) ────────────── pub(crate) fn partial_jaccard_dist_matrix(&self) -> (Array2<u64>, Array2<u64>) {
/// Returns `(inter[n×n], union[n×n])`.
/// Reduce across layers by element-wise addition before computing `1 - inter/union`.
pub fn partial_jaccard_dist_matrix(&self) -> (Array2<u64>, Array2<u64>) {
let n = self.n_cols(); let n = self.n_cols();
let results: Vec<(usize, usize, u64, u64)> = upper_pairs(n) let results: Vec<(usize, usize, u64, u64)> = upper_pairs(n)
.into_par_iter() .into_par_iter()
@@ -64,7 +70,6 @@ impl PersistentBitMatrix {
(i, j, inter, union) (i, j, inter, union)
}) })
.collect(); .collect();
let mut inter_m = Array2::zeros((n, n)); let mut inter_m = Array2::zeros((n, n));
let mut union_m = Array2::zeros((n, n)); let mut union_m = Array2::zeros((n, n));
for (i, j, inter, union) in results { for (i, j, inter, union) in results {
@@ -74,14 +79,10 @@ impl PersistentBitMatrix {
(inter_m, union_m) (inter_m, union_m)
} }
/// Returns `hamming[n×n]` (number of differing bits per pair). pub(crate) fn partial_hamming_dist_matrix(&self) -> Array2<u64> {
/// Additive across layers — reduce by element-wise addition.
pub fn partial_hamming_dist_matrix(&self) -> Array2<u64> {
self.pairwise_u64(|i, j| self.col(i).hamming_dist(self.col(j))) self.pairwise_u64(|i, j| self.col(i).hamming_dist(self.col(j)))
} }
// ── Private helpers ───────────────────────────────────────────────────────
fn pairwise_f64(&self, f: impl Fn(usize, usize) -> f64 + Sync) -> Array2<f64> { fn pairwise_f64(&self, f: impl Fn(usize, usize) -> f64 + Sync) -> Array2<f64> {
let n = self.n_cols(); let n = self.n_cols();
let results: Vec<(usize, usize, f64)> = upper_pairs(n) let results: Vec<(usize, usize, f64)> = upper_pairs(n)
@@ -99,17 +100,8 @@ impl PersistentBitMatrix {
.collect(); .collect();
fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v)))
} }
}
// ── Column append ───────────────────────────────────────────────────────────── pub(crate) fn append_column(dir: &Path, value_of: impl Fn(usize) -> bool) -> io::Result<()> {
impl PersistentBitMatrix {
/// Append a new column to an existing matrix on disk.
///
/// Reads `meta.json` to obtain `n` and the current column count, writes
/// `col_{n_cols:06}.pbiv` filled by `value_of(slot)`, then increments
/// `n_cols` in `meta.json`.
pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> bool) -> io::Result<()> {
let mut meta = MatrixMeta::load(dir)?; let mut meta = MatrixMeta::load(dir)?;
let mut b = PersistentBitVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?; let mut b = PersistentBitVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?;
for slot in 0..meta.n { for slot in 0..meta.n {
@@ -121,20 +113,208 @@ impl PersistentBitMatrix {
} }
} }
fn upper_pairs(n: usize) -> Vec<(usize, usize)> { // ── PackedBitMatrix ───────────────────────────────────────────────────────────
(0..n).flat_map(|i| (i + 1..n).map(move |j| (i, j))).collect()
const PBMX_MAGIC: [u8; 4] = *b"PBMX";
const PBMX_HEADER: usize = 24; // magic(4) + pad(4) + n_rows(8) + n_cols(8)
const PBIV_HEADER: usize = 16; // magic(4) + pad(4) + n(8)
/// Single-file packed layout: all columns concatenated behind a header.
pub struct PackedBitMatrix {
mmap: Mmap,
n_rows: usize,
n_cols: usize,
/// Absolute byte offset to the start of each column's bit data
/// (= file offset of the PBIV blob + PBIV_HEADER).
data_offsets: Vec<usize>,
} }
fn fill_symmetric<T>(n: usize, vals: impl Iterator<Item = (usize, usize, T, T)>) -> Array2<T> impl PackedBitMatrix {
where pub(crate) fn open(path: &Path) -> io::Result<Self> {
T: Clone + Default, let mmap = unsafe { Mmap::map(&File::open(path)?)? };
{ if mmap.len() < PBMX_HEADER {
let mut m = Array2::from_elem((n, n), T::default()); return Err(io::Error::new(io::ErrorKind::InvalidData, "PBMX file too short"));
for (i, j, vij, vji) in vals { }
m[[i, j]] = vij; if &mmap[0..4] != &PBMX_MAGIC {
m[[j, i]] = vji; return Err(io::Error::new(io::ErrorKind::InvalidData, "bad PBMX magic"));
}
let n_rows = u64::from_le_bytes(mmap[8..16].try_into().unwrap()) as usize;
let n_cols = u64::from_le_bytes(mmap[16..24].try_into().unwrap()) as usize;
let mut data_offsets = Vec::with_capacity(n_cols);
for c in 0..n_cols {
let off_pos = PBMX_HEADER + c * 8;
let col_file_off = u64::from_le_bytes(mmap[off_pos..off_pos+8].try_into().unwrap()) as usize;
data_offsets.push(col_file_off + PBIV_HEADER);
}
Ok(Self { mmap, n_rows, n_cols, data_offsets })
}
#[inline]
pub(crate) fn fill_row(&self, slot: usize, buf: &mut [u32]) {
for (c, &data_off) in self.data_offsets.iter().enumerate() {
buf[c] = ((self.mmap[data_off + (slot >> 3)] >> (slot & 7)) & 1) as u32;
}
}
pub(crate) fn row(&self, slot: usize) -> Box<[bool]> {
(0..self.n_cols).map(|c| {
(self.mmap[self.data_offsets[c] + (slot >> 3)] >> (slot & 7)) & 1 != 0
}).collect()
}
}
/// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files.
pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> {
let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols;
let col_files: Vec<Vec<u8>> = (0..n_cols)
.map(|c| fs::read(col_path(dir, c)))
.collect::<io::Result<_>>()?;
let header_size = PBMX_HEADER + n_cols * 8;
let mut col_offset = header_size;
let mut offsets = Vec::with_capacity(n_cols);
for data in &col_files {
offsets.push(col_offset as u64);
col_offset += data.len();
}
let packed_path = dir.join("matrix.pbmx");
let mut file = File::create(&packed_path)?;
file.write_all(&PBMX_MAGIC)?;
file.write_all(&[0u8; 4])?;
file.write_all(&(meta.n as u64).to_le_bytes())?;
file.write_all(&(n_cols as u64).to_le_bytes())?;
for &off in &offsets { file.write_all(&off.to_le_bytes())?; }
for data in &col_files { file.write_all(data)?; }
Ok(())
}
// ── PersistentBitMatrix — public enum ────────────────────────────────────────
/// Bit matrix that transparently handles columnar, packed, and implicit formats.
///
/// - `Columnar`: per-column `.pbiv` files (original format, used during build)
/// - `Packed`: single `matrix.pbmx` file (optimised for query — one `mmap`)
/// - `Implicit`: no file — all values are 1 (mono-genome presence/absence)
pub enum PersistentBitMatrix {
Columnar(ColumnarBitMatrix),
Packed(PackedBitMatrix),
Implicit { n_rows: usize, n_cols: usize },
}
impl PersistentBitMatrix {
/// Open from `layer_dir`, auto-detecting the format.
///
/// Checks (in order):
/// 1. `layer_dir/presence/matrix.pbmx` → Packed
/// 2. `layer_dir/presence/meta.json` → Columnar
/// 3. `layer_dir/layer_meta.json` → Implicit (new index)
/// 4. `layer_dir/unitigs.bin` → Implicit with warning (old index)
pub fn open(layer_dir: &Path) -> io::Result<Self> {
let presence_dir = layer_dir.join("presence");
if presence_dir.join("matrix.pbmx").exists() {
return Ok(Self::Packed(PackedBitMatrix::open(&presence_dir.join("matrix.pbmx"))?));
}
if MatrixMeta::load(&presence_dir).is_ok() {
return Ok(Self::Columnar(ColumnarBitMatrix::open(&presence_dir)?));
}
// No presence matrix → Implicit; requires layer_meta.json
let meta = LayerMeta::load(layer_dir).map_err(|_| io::Error::new(
io::ErrorKind::NotFound,
format!(
"no presence matrix and no layer_meta.json in {} — run 'obikmer upgrade'",
layer_dir.display()
),
))?;
Ok(Self::Implicit { n_rows: meta.n, n_cols: 1 })
}
pub fn n(&self) -> usize {
match self {
Self::Columnar(m) => m.n(),
Self::Packed(m) => m.n_rows,
Self::Implicit { n_rows, .. } => *n_rows,
}
}
pub fn n_cols(&self) -> usize {
match self {
Self::Columnar(m) => m.n_cols(),
Self::Packed(m) => m.n_cols,
Self::Implicit { n_cols, .. } => *n_cols,
}
}
pub fn col(&self, c: usize) -> &PersistentBitVec {
match self {
Self::Columnar(m) => m.col(c),
_ => panic!("col() only available on Columnar PersistentBitMatrix"),
}
}
pub fn row(&self, slot: usize) -> Box<[bool]> {
match self {
Self::Columnar(m) => m.row(slot),
Self::Packed(m) => m.row(slot),
Self::Implicit { n_cols, .. } => vec![true; *n_cols].into_boxed_slice(),
}
}
/// Fill `buf[i]` with `col_i[slot]` as 0/1 u32, without allocating.
pub fn fill_row(&self, slot: usize, buf: &mut [u32]) {
match self {
Self::Columnar(m) => m.fill_row(slot, buf),
Self::Packed(m) => m.fill_row(slot, buf),
Self::Implicit { n_cols, .. } => buf[..*n_cols].fill(1),
}
}
pub fn count_ones(&self) -> Array1<u64> {
match self {
Self::Columnar(m) => m.count_ones(),
_ => panic!("count_ones() only available on Columnar PersistentBitMatrix"),
}
}
pub fn jaccard_dist_matrix(&self) -> Array2<f64> {
match self {
Self::Columnar(m) => m.jaccard_dist_matrix(),
_ => panic!("jaccard_dist_matrix() only available on Columnar PersistentBitMatrix"),
}
}
pub fn hamming_dist_matrix(&self) -> Array2<u64> {
match self {
Self::Columnar(m) => m.hamming_dist_matrix(),
_ => panic!("hamming_dist_matrix() only available on Columnar PersistentBitMatrix"),
}
}
pub fn partial_jaccard_dist_matrix(&self) -> (Array2<u64>, Array2<u64>) {
match self {
Self::Columnar(m) => m.partial_jaccard_dist_matrix(),
_ => panic!("partial_jaccard_dist_matrix() only available on Columnar PersistentBitMatrix"),
}
}
pub fn partial_hamming_dist_matrix(&self) -> Array2<u64> {
match self {
Self::Columnar(m) => m.partial_hamming_dist_matrix(),
_ => panic!("partial_hamming_dist_matrix() only available on Columnar PersistentBitMatrix"),
}
}
/// Append a new column to an on-disk Columnar matrix.
pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> bool) -> io::Result<()> {
ColumnarBitMatrix::append_column(dir, value_of)
} }
m
} }
// ── Trait impls ─────────────────────────────────────────────────────────────── // ── Trait impls ───────────────────────────────────────────────────────────────
@@ -154,7 +334,7 @@ impl BitPartials for PersistentBitMatrix {
} }
} }
// ── Builder ─────────────────────────────────────────────────────────────────── // ── Builder (unchanged — always builds Columnar) ──────────────────────────────
pub struct PersistentBitMatrixBuilder { pub struct PersistentBitMatrixBuilder {
dir: PathBuf, dir: PathBuf,
@@ -181,3 +361,16 @@ impl PersistentBitMatrixBuilder {
MatrixMeta { n: self.n, n_cols: self.n_cols }.save(&self.dir) MatrixMeta { n: self.n, n_cols: self.n_cols }.save(&self.dir)
} }
} }
// ── Helpers ───────────────────────────────────────────────────────────────────
fn upper_pairs(n: usize) -> Vec<(usize, usize)> {
(0..n).flat_map(|i| (i + 1..n).map(move |j| (i, j))).collect()
}
fn fill_symmetric<T>(n: usize, vals: impl Iterator<Item = (usize, usize, T, T)>) -> Array2<T>
where T: Clone + Default {
let mut m = Array2::from_elem((n, n), T::default());
for (i, j, vij, vji) in vals { m[[i, j]] = vij; m[[j, i]] = vji; }
m
}
+312 -137
View File
@@ -1,9 +1,14 @@
use std::{fs, io, path::{Path, PathBuf}}; use std::cmp::Ordering;
use std::fs::{self, File};
use std::io::{self, Write as _};
use std::path::{Path, PathBuf};
use memmap2::Mmap;
use ndarray::{Array1, Array2}; use ndarray::{Array1, Array2};
use rayon::prelude::*; use rayon::prelude::*;
use crate::builder::PersistentCompactIntVecBuilder; use crate::builder::PersistentCompactIntVecBuilder;
use crate::format::{HEADER_SIZE, INDEX_ENTRY_SIZE, OVERFLOW_ENTRY_SIZE};
use crate::meta::MatrixMeta; use crate::meta::MatrixMeta;
use crate::reader::PersistentCompactIntVec; use crate::reader::PersistentCompactIntVec;
@@ -11,13 +16,15 @@ fn col_path(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pciv")) dir.join(format!("col_{col:06}.pciv"))
} }
pub struct PersistentCompactIntMatrix { // ── ColumnarCompactIntMatrix ──────────────────────────────────────────────────
pub struct ColumnarCompactIntMatrix {
cols: Vec<PersistentCompactIntVec>, cols: Vec<PersistentCompactIntVec>,
n: usize, n: usize,
} }
impl PersistentCompactIntMatrix { impl ColumnarCompactIntMatrix {
pub fn open(dir: &Path) -> io::Result<Self> { pub(crate) fn open(dir: &Path) -> io::Result<Self> {
let meta = MatrixMeta::load(dir)?; let meta = MatrixMeta::load(dir)?;
let cols = (0..meta.n_cols) let cols = (0..meta.n_cols)
.map(|c| PersistentCompactIntVec::open(&col_path(dir, c))) .map(|c| PersistentCompactIntVec::open(&col_path(dir, c)))
@@ -25,17 +32,29 @@ impl PersistentCompactIntMatrix {
Ok(Self { cols, n: meta.n }) Ok(Self { cols, n: meta.n })
} }
pub fn n(&self) -> usize { self.n } pub(crate) fn n(&self) -> usize { self.n }
pub fn n_cols(&self) -> usize { self.cols.len() } pub(crate) fn n_cols(&self) -> usize { self.cols.len() }
pub fn col(&self, c: usize) -> &PersistentCompactIntVec { &self.cols[c] } pub(crate) fn col(&self, c: usize) -> &PersistentCompactIntVec { &self.cols[c] }
pub fn row(&self, slot: usize) -> Box<[u32]> { pub(crate) fn row(&self, slot: usize) -> Box<[u32]> {
self.cols.iter().map(|c| c.get(slot)).collect() self.cols.iter().map(|c| c.get(slot)).collect()
} }
// ── Distance matrices ───────────────────────────────────────────────────── pub(crate) fn fill_row(&self, slot: usize, buf: &mut [u32]) {
for (c, col) in self.cols.iter().enumerate() {
buf[c] = col.get(slot);
}
}
pub fn bray_dist_matrix(&self) -> Array2<f64> { pub(crate) fn sum(&self) -> Array1<u64> {
let sums: Vec<u64> = (0..self.n_cols())
.into_par_iter()
.map(|c| self.col(c).sum())
.collect();
Array1::from_vec(sums)
}
pub(crate) fn bray_dist_matrix(&self) -> Array2<f64> {
let sum_min = self.partial_bray_dist_matrix(); let sum_min = self.partial_bray_dist_matrix();
let col_sums = self.sum(); let col_sums = self.sum();
let n = self.n_cols(); let n = self.n_cols();
@@ -52,63 +71,19 @@ impl PersistentCompactIntMatrix {
m m
} }
pub fn relfreq_bray_dist_matrix(&self) -> Array2<f64> { pub(crate) fn partial_bray_dist_matrix(&self) -> Array2<u64> {
self.pairwise(|i, j| self.col(i).relfreq_bray_dist(self.col(j)))
}
pub fn euclidean_dist_matrix(&self) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).euclidean_dist(self.col(j)))
}
pub fn relfreq_euclidean_dist_matrix(&self) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).relfreq_euclidean_dist(self.col(j)))
}
pub fn hellinger_dist_matrix(&self) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).hellinger_dist(self.col(j)))
}
pub fn jaccard_dist_matrix(&self) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).jaccard_dist(self.col(j)))
}
pub fn threshold_jaccard_dist_matrix(&self, threshold: u32) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).threshold_jaccard_dist(self.col(j), threshold))
}
/// Returns the sum of each column as `Array1<u64>`.
pub fn sum(&self) -> Array1<u64> {
let sums: Vec<u64> = (0..self.n_cols())
.into_par_iter()
.map(|c| self.col(c).sum())
.collect();
Array1::from_vec(sums)
}
// ── Partial matrices (additively decomposable across layers) ──────────────
/// Returns `sum_min[n×n]` where `sum_min[i,j]` = Σ_slot min(col_i[slot], col_j[slot]).
/// The denominator `col_sums[i] + col_sums[j]` is obtained from `self.sum()`.
/// Additive across layers by element-wise addition.
pub fn partial_bray_dist_matrix(&self) -> Array2<u64> {
self.pairwise_u64(|i, j| self.col(i).partial_bray_dist(self.col(j))) self.pairwise_u64(|i, j| self.col(i).partial_bray_dist(self.col(j)))
} }
/// Returns sum of squared differences `[n×n]`. pub(crate) fn partial_euclidean_dist_matrix(&self) -> Array2<f64> {
/// Reduce across layers by element-wise addition, then take `sqrt` for the final distance.
pub fn partial_euclidean_dist_matrix(&self) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).partial_euclidean_dist(self.col(j))) self.pairwise(|i, j| self.col(i).partial_euclidean_dist(self.col(j)))
} }
/// Returns `(inter[n×n], union[n×n])` for threshold-Jaccard. pub(crate) fn partial_threshold_jaccard_dist_matrix(
/// Reduce across layers by element-wise addition before computing `1 - inter/union`. &self, threshold: u32,
pub fn partial_threshold_jaccard_dist_matrix(
&self,
threshold: u32,
) -> (Array2<u64>, Array2<u64>) { ) -> (Array2<u64>, Array2<u64>) {
let n = self.n_cols(); let n = self.n_cols();
let pairs = upper_pairs(n); let pairs = upper_pairs(n);
let results: Vec<(usize, usize, u64, u64)> = pairs let results: Vec<(usize, usize, u64, u64)> = pairs
.into_par_iter() .into_par_iter()
.map(|(i, j)| { .map(|(i, j)| {
@@ -117,7 +92,6 @@ impl PersistentCompactIntMatrix {
(i, j, inter, union) (i, j, inter, union)
}) })
.collect(); .collect();
let mut inter_m = Array2::zeros((n, n)); let mut inter_m = Array2::zeros((n, n));
let mut union_m = Array2::zeros((n, n)); let mut union_m = Array2::zeros((n, n));
for (i, j, inter, union) in results { for (i, j, inter, union) in results {
@@ -127,99 +101,299 @@ impl PersistentCompactIntMatrix {
(inter_m, union_m) (inter_m, union_m)
} }
/// Returns matrix of `Σ_slot min(col_i[slot]/sum_i, col_j[slot]/sum_j)` per pair. pub(crate) fn partial_relfreq_bray_dist_matrix(&self, col_sums: &Array1<u64>) -> Array2<f64> {
/// `col_sums` must be the GLOBAL sums across all layers/partitions.
/// Reduce across layers by element-wise addition; final distance = `1 - value`.
pub fn partial_relfreq_bray_dist_matrix(&self, col_sums: &Array1<u64>) -> Array2<f64> {
self.pairwise(|i, j| { self.pairwise(|i, j| {
self.col(i).partial_relfreq_bray_dist( self.col(i).partial_relfreq_bray_dist(self.col(j), col_sums[i] as f64, col_sums[j] as f64)
self.col(j),
col_sums[i] as f64,
col_sums[j] as f64,
)
}) })
} }
/// Returns matrix of `Σ_slot (col_i[slot]/sum_i - col_j[slot]/sum_j)²` per pair. pub(crate) fn partial_relfreq_euclidean_dist_matrix(&self, col_sums: &Array1<u64>) -> Array2<f64> {
/// `col_sums` must be the GLOBAL sums across all layers/partitions.
/// Reduce across layers by element-wise addition; final distance = `sqrt(value)`.
pub fn partial_relfreq_euclidean_dist_matrix(&self, col_sums: &Array1<u64>) -> Array2<f64> {
self.pairwise(|i, j| { self.pairwise(|i, j| {
self.col(i).partial_relfreq_euclidean_dist( self.col(i).partial_relfreq_euclidean_dist(self.col(j), col_sums[i] as f64, col_sums[j] as f64)
self.col(j),
col_sums[i] as f64,
col_sums[j] as f64,
)
}) })
} }
/// Returns matrix of `Σ_slot (√(col_i/sum_i) - √(col_j/sum_j))²` per pair. pub(crate) fn partial_hellinger_euclidean_dist_matrix(&self, col_sums: &Array1<u64>) -> Array2<f64> {
/// `col_sums` must be the GLOBAL sums across all layers/partitions.
/// Reduce across layers by element-wise addition; final distance = `sqrt(value) / √2`.
pub fn partial_hellinger_euclidean_dist_matrix(&self, col_sums: &Array1<u64>) -> Array2<f64> {
self.pairwise(|i, j| { self.pairwise(|i, j| {
self.col(i).partial_hellinger_euclidean_dist( self.col(i).partial_hellinger_euclidean_dist(self.col(j), col_sums[i] as f64, col_sums[j] as f64)
self.col(j),
col_sums[i] as f64,
col_sums[j] as f64,
)
}) })
} }
// ── Private helpers ─────────────────────────────────────────────────────── pub(crate) fn relfreq_bray_dist_matrix(&self) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).relfreq_bray_dist(self.col(j)))
}
pub(crate) fn euclidean_dist_matrix(&self) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).euclidean_dist(self.col(j)))
}
pub(crate) fn relfreq_euclidean_dist_matrix(&self) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).relfreq_euclidean_dist(self.col(j)))
}
pub(crate) fn hellinger_dist_matrix(&self) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).hellinger_dist(self.col(j)))
}
pub(crate) fn jaccard_dist_matrix(&self) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).jaccard_dist(self.col(j)))
}
pub(crate) fn threshold_jaccard_dist_matrix(&self, threshold: u32) -> Array2<f64> {
self.pairwise(|i, j| self.col(i).threshold_jaccard_dist(self.col(j), threshold))
}
pub(crate) fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> io::Result<()> {
let mut meta = MatrixMeta::load(dir)?;
let mut b = PersistentCompactIntVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?;
for slot in 0..meta.n { b.set(slot, value_of(slot)); }
b.close()?;
meta.n_cols += 1;
meta.save(dir)
}
fn pairwise(&self, f: impl Fn(usize, usize) -> f64 + Sync) -> Array2<f64> { fn pairwise(&self, f: impl Fn(usize, usize) -> f64 + Sync) -> Array2<f64> {
let n = self.n_cols(); let n = self.n_cols();
let results: Vec<(usize, usize, f64)> = upper_pairs(n) let results: Vec<(usize, usize, f64)> = upper_pairs(n)
.into_par_iter() .into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect();
.map(|(i, j)| (i, j, f(i, j)))
.collect();
fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v)))
} }
fn pairwise_u64(&self, f: impl Fn(usize, usize) -> u64 + Sync) -> Array2<u64> { fn pairwise_u64(&self, f: impl Fn(usize, usize) -> u64 + Sync) -> Array2<u64> {
let n = self.n_cols(); let n = self.n_cols();
let results: Vec<(usize, usize, u64)> = upper_pairs(n) let results: Vec<(usize, usize, u64)> = upper_pairs(n)
.into_par_iter() .into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect();
.map(|(i, j)| (i, j, f(i, j)))
.collect();
fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v)))
} }
} }
fn upper_pairs(n: usize) -> Vec<(usize, usize)> { // ── PackedCompactIntMatrix ────────────────────────────────────────────────────
(0..n).flat_map(|i| (i + 1..n).map(move |j| (i, j))).collect()
const PCMX_MAGIC: [u8; 4] = *b"PCMX";
const PCMX_HEADER: usize = 24; // magic(4) + pad(4) + n_rows(8) + n_cols(8)
/// Per-column metadata pre-parsed from the embedded PCIV header.
struct ColInfo {
primary_start: usize, // absolute mmap offset to primary array
data_offset: usize, // absolute mmap offset to overflow array
n_overflow: usize,
step: usize,
index: Vec<(usize, usize)>,
} }
fn fill_symmetric<T>(n: usize, vals: impl Iterator<Item = (usize, usize, T, T)>) -> Array2<T> pub struct PackedCompactIntMatrix {
where mmap: Mmap,
T: Clone + Default, n_rows: usize,
{ n_cols: usize,
let mut m = Array2::from_elem((n, n), T::default()); columns: Vec<ColInfo>,
for (i, j, vij, vji) in vals {
m[[i, j]] = vij;
m[[j, i]] = vji;
}
m
} }
// ── Column append ───────────────────────────────────────────────────────────── impl PackedCompactIntMatrix {
pub(crate) fn open(path: &Path) -> io::Result<Self> {
let mmap = unsafe { Mmap::map(&File::open(path)?)? };
if mmap.len() < PCMX_HEADER {
return Err(io::Error::new(io::ErrorKind::InvalidData, "PCMX file too short"));
}
if &mmap[0..4] != &PCMX_MAGIC {
return Err(io::Error::new(io::ErrorKind::InvalidData, "bad PCMX magic"));
}
let n_rows = u64::from_le_bytes(mmap[8..16].try_into().unwrap()) as usize;
let n_cols = u64::from_le_bytes(mmap[16..24].try_into().unwrap()) as usize;
let mut columns = Vec::with_capacity(n_cols);
for c in 0..n_cols {
let off_pos = PCMX_HEADER + c * 8;
let col_base = u64::from_le_bytes(mmap[off_pos..off_pos+8].try_into().unwrap()) as usize;
// Parse embedded PCIV header at col_base
let n_ov = u64::from_le_bytes(mmap[col_base+16..col_base+24].try_into().unwrap()) as usize;
let n_idx = u64::from_le_bytes(mmap[col_base+24..col_base+32].try_into().unwrap()) as usize;
let step = u64::from_le_bytes(mmap[col_base+32..col_base+40].try_into().unwrap()) as usize;
let n_pciv = u64::from_le_bytes(mmap[col_base+8..col_base+16].try_into().unwrap()) as usize;
let primary_start = col_base + HEADER_SIZE;
let data_offset = primary_start + n_pciv;
let index_offset = data_offset + n_ov * OVERFLOW_ENTRY_SIZE;
let mut index = Vec::with_capacity(n_idx);
for i in 0..n_idx {
let ioff = index_offset + i * INDEX_ENTRY_SIZE;
let slot = u64::from_le_bytes(mmap[ioff..ioff+8].try_into().unwrap()) as usize;
let pos = u64::from_le_bytes(mmap[ioff+8..ioff+16].try_into().unwrap()) as usize;
index.push((slot, pos));
}
columns.push(ColInfo { primary_start, data_offset, n_overflow: n_ov, step, index });
}
Ok(Self { mmap, n_rows, n_cols, columns })
}
#[inline]
pub(crate) fn get(&self, col: usize, slot: usize) -> u32 {
let ci = &self.columns[col];
let v = self.mmap[ci.primary_start + slot];
if v < 255 { return v as u32; }
self.overflow_get(ci, slot)
}
fn overflow_get(&self, ci: &ColInfo, slot: usize) -> u32 {
let (pos_start, pos_end) = if ci.step == 0 {
(0, ci.n_overflow)
} else {
let i = ci.index.partition_point(|&(s, _)| s <= slot).saturating_sub(1);
let start = ci.index[i].1;
let end = if i + 1 < ci.index.len() { ci.index[i+1].1 } else { ci.n_overflow };
(start, end)
};
let mut lo = pos_start;
let mut hi = pos_end;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let off = ci.data_offset + mid * OVERFLOW_ENTRY_SIZE;
let stored = u64::from_le_bytes(self.mmap[off..off+8].try_into().unwrap()) as usize;
match stored.cmp(&slot) {
Ordering::Equal => return u32::from_le_bytes(self.mmap[off+8..off+12].try_into().unwrap()),
Ordering::Less => lo = mid + 1,
Ordering::Greater => hi = mid,
}
}
panic!("slot {slot} marked overflow but not found")
}
pub(crate) fn fill_row(&self, slot: usize, buf: &mut [u32]) {
for c in 0..self.n_cols { buf[c] = self.get(c, slot); }
}
pub(crate) fn row(&self, slot: usize) -> Box<[u32]> {
(0..self.n_cols).map(|c| self.get(c, slot)).collect()
}
}
/// Build `counts/matrix.pcmx` from existing `col_*.pciv` files.
pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> {
let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols;
let col_files: Vec<Vec<u8>> = (0..n_cols)
.map(|c| fs::read(col_path(dir, c)))
.collect::<io::Result<_>>()?;
let header_size = PCMX_HEADER + n_cols * 8;
let mut col_offset = header_size;
let mut offsets = Vec::with_capacity(n_cols);
for data in &col_files {
offsets.push(col_offset as u64);
col_offset += data.len();
}
let packed_path = dir.join("matrix.pcmx");
let mut file = File::create(&packed_path)?;
file.write_all(&PCMX_MAGIC)?;
file.write_all(&[0u8; 4])?;
file.write_all(&(meta.n as u64).to_le_bytes())?;
file.write_all(&(n_cols as u64).to_le_bytes())?;
for &off in &offsets { file.write_all(&off.to_le_bytes())?; }
for data in &col_files { file.write_all(data)?; }
Ok(())
}
// ── PersistentCompactIntMatrix — public enum ──────────────────────────────────
pub enum PersistentCompactIntMatrix {
Columnar(ColumnarCompactIntMatrix),
Packed(PackedCompactIntMatrix),
}
impl PersistentCompactIntMatrix { impl PersistentCompactIntMatrix {
/// Append a new column to an existing matrix on disk. /// Open from `layer_dir`, auto-detecting Packed or Columnar.
/// pub fn open(layer_dir: &Path) -> io::Result<Self> {
/// Reads `meta.json` to obtain `n` and the current column count, writes let counts_dir = layer_dir.join("counts");
/// `col_{n_cols:06}.pciv` filled by `value_of(slot)`, then increments
/// `n_cols` in `meta.json`. if counts_dir.join("matrix.pcmx").exists() {
pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> io::Result<()> { return Ok(Self::Packed(PackedCompactIntMatrix::open(&counts_dir.join("matrix.pcmx"))?));
let mut meta = MatrixMeta::load(dir)?;
let mut b = PersistentCompactIntVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?;
for slot in 0..meta.n {
b.set(slot, value_of(slot));
} }
b.close()?;
meta.n_cols += 1; if MatrixMeta::load(&counts_dir).is_ok() {
meta.save(dir) return Ok(Self::Columnar(ColumnarCompactIntMatrix::open(&counts_dir)?));
}
Err(io::Error::new(
io::ErrorKind::NotFound,
format!("no count matrix found in {} — run 'obikmer upgrade'", layer_dir.display()),
))
}
pub fn n(&self) -> usize {
match self { Self::Columnar(m) => m.n(), Self::Packed(m) => m.n_rows }
}
pub fn n_cols(&self) -> usize {
match self { Self::Columnar(m) => m.n_cols(), Self::Packed(m) => m.n_cols }
}
pub fn col(&self, c: usize) -> &PersistentCompactIntVec {
match self {
Self::Columnar(m) => m.col(c),
_ => panic!("col() only available on Columnar PersistentCompactIntMatrix"),
}
}
pub fn row(&self, slot: usize) -> Box<[u32]> {
match self { Self::Columnar(m) => m.row(slot), Self::Packed(m) => m.row(slot) }
}
pub fn fill_row(&self, slot: usize, buf: &mut [u32]) {
match self { Self::Columnar(m) => m.fill_row(slot, buf), Self::Packed(m) => m.fill_row(slot, buf) }
}
pub fn sum(&self) -> Array1<u64> {
match self {
Self::Columnar(m) => m.sum(),
_ => panic!("sum() only available on Columnar PersistentCompactIntMatrix"),
}
}
pub fn bray_dist_matrix(&self) -> Array2<f64> {
match self { Self::Columnar(m) => m.bray_dist_matrix(), _ => panic!("Columnar only") }
}
pub fn relfreq_bray_dist_matrix(&self) -> Array2<f64> {
match self { Self::Columnar(m) => m.relfreq_bray_dist_matrix(), _ => panic!("Columnar only") }
}
pub fn euclidean_dist_matrix(&self) -> Array2<f64> {
match self { Self::Columnar(m) => m.euclidean_dist_matrix(), _ => panic!("Columnar only") }
}
pub fn relfreq_euclidean_dist_matrix(&self) -> Array2<f64> {
match self { Self::Columnar(m) => m.relfreq_euclidean_dist_matrix(), _ => panic!("Columnar only") }
}
pub fn hellinger_dist_matrix(&self) -> Array2<f64> {
match self { Self::Columnar(m) => m.hellinger_dist_matrix(), _ => panic!("Columnar only") }
}
pub fn jaccard_dist_matrix(&self) -> Array2<f64> {
match self { Self::Columnar(m) => m.jaccard_dist_matrix(), _ => panic!("Columnar only") }
}
pub fn threshold_jaccard_dist_matrix(&self, threshold: u32) -> Array2<f64> {
match self { Self::Columnar(m) => m.threshold_jaccard_dist_matrix(threshold), _ => panic!("Columnar only") }
}
pub fn partial_bray_dist_matrix(&self) -> Array2<u64> {
match self { Self::Columnar(m) => m.partial_bray_dist_matrix(), _ => panic!("Columnar only") }
}
pub fn partial_euclidean_dist_matrix(&self) -> Array2<f64> {
match self { Self::Columnar(m) => m.partial_euclidean_dist_matrix(), _ => panic!("Columnar only") }
}
pub fn partial_threshold_jaccard_dist_matrix(&self, threshold: u32) -> (Array2<u64>, Array2<u64>) {
match self { Self::Columnar(m) => m.partial_threshold_jaccard_dist_matrix(threshold), _ => panic!("Columnar only") }
}
pub fn partial_relfreq_bray_dist_matrix(&self, col_sums: &Array1<u64>) -> Array2<f64> {
match self { Self::Columnar(m) => m.partial_relfreq_bray_dist_matrix(col_sums), _ => panic!("Columnar only") }
}
pub fn partial_relfreq_euclidean_dist_matrix(&self, col_sums: &Array1<u64>) -> Array2<f64> {
match self { Self::Columnar(m) => m.partial_relfreq_euclidean_dist_matrix(col_sums), _ => panic!("Columnar only") }
}
pub fn partial_hellinger_euclidean_dist_matrix(&self, col_sums: &Array1<u64>) -> Array2<f64> {
match self { Self::Columnar(m) => m.partial_hellinger_euclidean_dist_matrix(col_sums), _ => panic!("Columnar only") }
}
pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> io::Result<()> {
ColumnarCompactIntMatrix::append_column(dir, value_of)
} }
} }
@@ -232,24 +406,12 @@ impl ColumnWeights for PersistentCompactIntMatrix {
} }
impl CountPartials for PersistentCompactIntMatrix { impl CountPartials for PersistentCompactIntMatrix {
fn partial_bray(&self) -> Array2<u64> { fn partial_bray(&self) -> Array2<u64> { self.partial_bray_dist_matrix() }
self.partial_bray_dist_matrix() fn partial_euclidean(&self) -> Array2<f64> { self.partial_euclidean_dist_matrix() }
} fn partial_threshold_jaccard(&self, t: u32) -> (Array2<u64>, Array2<u64>) { self.partial_threshold_jaccard_dist_matrix(t) }
fn partial_euclidean(&self) -> Array2<f64> { fn partial_relfreq_bray(&self, g: &Array1<u64>) -> Array2<f64> { self.partial_relfreq_bray_dist_matrix(g) }
self.partial_euclidean_dist_matrix() fn partial_relfreq_euclidean(&self, g: &Array1<u64>) -> Array2<f64> { self.partial_relfreq_euclidean_dist_matrix(g) }
} fn partial_hellinger(&self, g: &Array1<u64>) -> Array2<f64> { self.partial_hellinger_euclidean_dist_matrix(g) }
fn partial_threshold_jaccard(&self, threshold: u32) -> (Array2<u64>, Array2<u64>) {
self.partial_threshold_jaccard_dist_matrix(threshold)
}
fn partial_relfreq_bray(&self, global: &Array1<u64>) -> Array2<f64> {
self.partial_relfreq_bray_dist_matrix(global)
}
fn partial_relfreq_euclidean(&self, global: &Array1<u64>) -> Array2<f64> {
self.partial_relfreq_euclidean_dist_matrix(global)
}
fn partial_hellinger(&self, global: &Array1<u64>) -> Array2<f64> {
self.partial_hellinger_euclidean_dist_matrix(global)
}
} }
// ── Builder ─────────────────────────────────────────────────────────────────── // ── Builder ───────────────────────────────────────────────────────────────────
@@ -279,3 +441,16 @@ impl PersistentCompactIntMatrixBuilder {
MatrixMeta { n: self.n, n_cols: self.n_cols }.save(&self.dir) MatrixMeta { n: self.n, n_cols: self.n_cols }.save(&self.dir)
} }
} }
// ── Helpers ───────────────────────────────────────────────────────────────────
fn upper_pairs(n: usize) -> Vec<(usize, usize)> {
(0..n).flat_map(|i| (i + 1..n).map(move |j| (i, j))).collect()
}
fn fill_symmetric<T>(n: usize, vals: impl Iterator<Item = (usize, usize, T, T)>) -> Array2<T>
where T: Clone + Default {
let mut m = Array2::from_elem((n, n), T::default());
for (i, j, vij, vji) in vals { m[[i, j]] = vij; m[[j, i]] = vji; }
m
}
+33
View File
@@ -0,0 +1,33 @@
use std::{fs, io, path::Path};
/// Lightweight metadata stored at the layer level (`layer_meta.json`).
///
/// Written by `obilayeredmap::MphfLayer::build` alongside `mphf.bin`.
/// Read by `PersistentBitMatrix::open` to determine `n_rows` for the
/// implicit (mono-genome presence/absence) case.
pub struct LayerMeta {
pub n: usize,
}
impl LayerMeta {
pub const FILENAME: &'static str = "layer_meta.json";
pub fn save(layer_dir: &Path, n: usize) -> io::Result<()> {
fs::write(layer_dir.join(Self::FILENAME), format!("{{\"n\":{n}}}\n"))
}
pub fn load(layer_dir: &Path) -> io::Result<Self> {
let s = fs::read_to_string(layer_dir.join(Self::FILENAME))?;
Self::parse(&s)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "bad layer_meta.json"))
}
fn parse(s: &str) -> Option<Self> {
let key = "\"n\":";
let pos = s.find(key)? + key.len();
let rest = s[pos..].trim_start();
let end = rest.find(|c: char| !c.is_ascii_digit()).unwrap_or(rest.len());
let n = rest[..end].parse().ok()?;
Some(Self { n })
}
}
+4 -2
View File
@@ -3,14 +3,16 @@ mod bitmatrix;
mod builder; mod builder;
mod format; mod format;
mod intmatrix; mod intmatrix;
mod layer_meta;
mod meta; mod meta;
mod reader; mod reader;
pub mod traits; pub mod traits;
pub use bitvec::{BitIter, PersistentBitVec, PersistentBitVecBuilder}; pub use bitvec::{BitIter, PersistentBitVec, PersistentBitVecBuilder};
pub use bitmatrix::{PersistentBitMatrix, PersistentBitMatrixBuilder}; pub use bitmatrix::{PersistentBitMatrix, PersistentBitMatrixBuilder, pack_bit_matrix};
pub use builder::PersistentCompactIntVecBuilder; pub use builder::PersistentCompactIntVecBuilder;
pub use intmatrix::{PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder}; pub use intmatrix::{PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, pack_compact_int_matrix};
pub use layer_meta::LayerMeta;
pub use reader::PersistentCompactIntVec; pub use reader::PersistentCompactIntVec;
pub use traits::{BitPartials, ColumnWeights, CountPartials}; pub use traits::{BitPartials, ColumnWeights, CountPartials};
+13 -8
View File
@@ -5,7 +5,8 @@ use crate::{PersistentBitMatrix, PersistentBitMatrixBuilder};
fn make_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) { fn make_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) {
let n = cols.first().map_or(0, |c| c.len()); let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut b = PersistentBitMatrixBuilder::new(n, dir.path()).unwrap(); let presence = dir.path().join("presence");
let mut b = PersistentBitMatrixBuilder::new(n, &presence).unwrap();
for &col in cols { for &col in cols {
let mut cb = b.add_col().unwrap(); let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { for (slot, &v) in col.iter().enumerate() {
@@ -21,7 +22,8 @@ fn make_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) {
#[test] #[test]
fn single_col_roundtrip() { fn single_col_roundtrip() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut b = PersistentBitMatrixBuilder::new(4, dir.path()).unwrap(); let presence = dir.path().join("presence");
let mut b = PersistentBitMatrixBuilder::new(4, &presence).unwrap();
let mut col = b.add_col().unwrap(); let mut col = b.add_col().unwrap();
col.set(0, true); col.set(0, true);
col.set(1, false); col.set(1, false);
@@ -42,7 +44,8 @@ fn single_col_roundtrip() {
#[test] #[test]
fn two_cols_roundtrip() { fn two_cols_roundtrip() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut b = PersistentBitMatrixBuilder::new(3, dir.path()).unwrap(); let presence = dir.path().join("presence");
let mut b = PersistentBitMatrixBuilder::new(3, &presence).unwrap();
let mut col0 = b.add_col().unwrap(); let mut col0 = b.add_col().unwrap();
col0.set(0, true); col0.set(1, false); col0.set(2, true); col0.set(0, true); col0.set(1, false); col0.set(2, true);
col0.close().unwrap(); col0.close().unwrap();
@@ -61,7 +64,8 @@ fn two_cols_roundtrip() {
#[test] #[test]
fn col_accessor() { fn col_accessor() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut b = PersistentBitMatrixBuilder::new(3, dir.path()).unwrap(); let presence = dir.path().join("presence");
let mut b = PersistentBitMatrixBuilder::new(3, &presence).unwrap();
let mut col = b.add_col().unwrap(); let mut col = b.add_col().unwrap();
col.set(0, true); col.set(1, false); col.set(2, true); col.set(0, true); col.set(1, false); col.set(2, true);
col.close().unwrap(); col.close().unwrap();
@@ -76,7 +80,8 @@ fn col_accessor() {
#[test] #[test]
fn zero_cols_roundtrip() { fn zero_cols_roundtrip() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let b = PersistentBitMatrixBuilder::new(8, dir.path()).unwrap(); let presence = dir.path().join("presence");
let b = PersistentBitMatrixBuilder::new(8, &presence).unwrap();
b.close().unwrap(); b.close().unwrap();
let m = PersistentBitMatrix::open(dir.path()).unwrap(); let m = PersistentBitMatrix::open(dir.path()).unwrap();
@@ -89,9 +94,9 @@ fn zero_cols_roundtrip() {
#[test] #[test]
fn count_ones_per_column() { fn count_ones_per_column() {
let (_d, m) = make_matrix(&[ let (_d, m) = make_matrix(&[
&[true, false, true, true], // 3 ones &[true, false, true, true],
&[false, false, false, false], // 0 ones &[false, false, false, false],
&[true, true, true, false], // 3 ones &[true, true, true, false],
]); ]);
let c = m.count_ones(); let c = m.count_ones();
assert_eq!(c[0], 3); assert_eq!(c[0], 3);
+5 -5
View File
@@ -5,7 +5,7 @@ use crate::{PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder};
fn make_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) { fn make_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) {
let n = cols.first().map_or(0, |c| c.len()); let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut b = PersistentCompactIntMatrixBuilder::new(n, dir.path()).unwrap(); let mut b = PersistentCompactIntMatrixBuilder::new(n, &dir.path().join("counts")).unwrap();
for &col in cols { for &col in cols {
let mut cb = b.add_col().unwrap(); let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { for (slot, &v) in col.iter().enumerate() {
@@ -21,7 +21,7 @@ fn make_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatri
#[test] #[test]
fn single_col_roundtrip() { fn single_col_roundtrip() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut b = PersistentCompactIntMatrixBuilder::new(4, dir.path()).unwrap(); let mut b = PersistentCompactIntMatrixBuilder::new(4, &dir.path().join("counts")).unwrap();
let mut col = b.add_col().unwrap(); let mut col = b.add_col().unwrap();
col.set(0, 10); col.set(0, 10);
col.set(1, 200); col.set(1, 200);
@@ -42,7 +42,7 @@ fn single_col_roundtrip() {
#[test] #[test]
fn two_cols_roundtrip() { fn two_cols_roundtrip() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut b = PersistentCompactIntMatrixBuilder::new(3, dir.path()).unwrap(); let mut b = PersistentCompactIntMatrixBuilder::new(3, &dir.path().join("counts")).unwrap();
let mut col0 = b.add_col().unwrap(); let mut col0 = b.add_col().unwrap();
col0.set(0, 1); col0.set(1, 2); col0.set(2, 3); col0.set(0, 1); col0.set(1, 2); col0.set(2, 3);
col0.close().unwrap(); col0.close().unwrap();
@@ -61,7 +61,7 @@ fn two_cols_roundtrip() {
#[test] #[test]
fn col_accessor() { fn col_accessor() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut b = PersistentCompactIntMatrixBuilder::new(2, dir.path()).unwrap(); let mut b = PersistentCompactIntMatrixBuilder::new(2, &dir.path().join("counts")).unwrap();
let mut col0 = b.add_col().unwrap(); let mut col0 = b.add_col().unwrap();
col0.set(0, 5); col0.set(1, 7); col0.set(0, 5); col0.set(1, 7);
col0.close().unwrap(); col0.close().unwrap();
@@ -75,7 +75,7 @@ fn col_accessor() {
#[test] #[test]
fn zero_cols_roundtrip() { fn zero_cols_roundtrip() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let b = PersistentCompactIntMatrixBuilder::new(10, dir.path()).unwrap(); let b = PersistentCompactIntMatrixBuilder::new(10, &dir.path().join("counts")).unwrap();
b.close().unwrap(); b.close().unwrap();
let m = PersistentCompactIntMatrix::open(dir.path()).unwrap(); let m = PersistentCompactIntMatrix::open(dir.path()).unwrap();
+1
View File
@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
obikseq = { path = "../obikseq" }
obikpartitionner = { path = "../obikpartitionner" } obikpartitionner = { path = "../obikpartitionner" }
obiskio = { path = "../obiskio" } obiskio = { path = "../obiskio" }
obisys = { path = "../obisys" } obisys = { path = "../obisys" }
+88
View File
@@ -6,10 +6,13 @@ use std::sync::{Arc, Mutex};
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
use obikpartitionner::{KmerPartition, KmerSpectrum}; use obikpartitionner::{KmerPartition, KmerSpectrum};
use obilayeredmap;
use obisys::{Reporter, Stage}; use obisys::{Reporter, Stage};
use rayon::prelude::*; use rayon::prelude::*;
use tracing::info; use tracing::info;
use obikseq::{set_k, set_m};
use crate::error::{OKIError, OKIResult}; use crate::error::{OKIError, OKIResult};
use crate::meta::{GenomeInfo, IndexConfig, IndexMeta}; use crate::meta::{GenomeInfo, IndexConfig, IndexMeta};
use crate::state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED}; use crate::state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED};
@@ -39,6 +42,8 @@ impl KmerIndex {
config.minimizer_size, config.minimizer_size,
force, force,
)?; )?;
set_k(config.kmer_size);
set_m(config.minimizer_size);
let mut meta = IndexMeta::new(config); let mut meta = IndexMeta::new(config);
if let Some(info) = genome_info { if let Some(info) = genome_info {
meta.genomes.push(info); meta.genomes.push(info);
@@ -50,6 +55,8 @@ impl KmerIndex {
pub fn open<P: AsRef<Path>>(path: P) -> OKIResult<Self> { pub fn open<P: AsRef<Path>>(path: P) -> OKIResult<Self> {
let root_path = path.as_ref().to_owned(); let root_path = path.as_ref().to_owned();
let meta = IndexMeta::read(&root_path).map_err(OKIError::Io)?; let meta = IndexMeta::read(&root_path).map_err(OKIError::Io)?;
set_k(meta.config.kmer_size);
set_m(meta.config.minimizer_size);
let partition = KmerPartition::open_with_config( let partition = KmerPartition::open_with_config(
&root_path, &root_path,
meta.config.kmer_size, meta.config.kmer_size,
@@ -199,6 +206,87 @@ impl KmerIndex {
.join(format!("layer_{layer}")) .join(format!("layer_{layer}"))
.join("unitigs.bin") .join("unitigs.bin")
} }
/// Pack all partition matrices into single-file format (presence → .pbmx, counts → .pcmx).
///
/// Reduces per-query file-open overhead from O(n_genomes) to O(1) per partition.
/// Column files are kept in place; packed files take priority when opening.
pub fn pack_matrices(&self) -> OKIResult<()> {
use obicompactvec::{pack_bit_matrix, pack_compact_int_matrix};
use obilayeredmap::meta::PartitionMeta;
let n = self.n_partitions();
let errors: Vec<_> = (0..n)
.into_par_iter()
.filter_map(|i| {
let index_dir = self.partition.part_dir(i).join("index");
if !index_dir.exists() { return None; }
let meta = match PartitionMeta::load(&index_dir) {
Ok(m) => m,
Err(e) => return Some(OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
};
for l in 0..meta.n_layers {
let layer_dir = index_dir.join(format!("layer_{l}"));
let presence_dir = layer_dir.join("presence");
let counts_dir = layer_dir.join("counts");
if presence_dir.exists() {
if let Err(e) = pack_bit_matrix(&presence_dir) {
return Some(OKIError::Io(e));
}
}
if counts_dir.exists() {
if let Err(e) = pack_compact_int_matrix(&counts_dir) {
return Some(OKIError::Io(e));
}
}
}
None
})
.collect();
if let Some(e) = errors.into_iter().next() { return Err(e); }
Ok(())
}
/// Write a `layer_meta.json` in any layer directory that is missing one.
///
/// Old indexes were built before this file was required. The number of
/// kmers is recovered from `unitigs.bin`, which is always present.
pub fn upgrade_layer_meta(&self) -> OKIResult<()> {
use obicompactvec::LayerMeta;
use obiskio::UnitigFileReader;
use obilayeredmap::meta::PartitionMeta;
let n = self.n_partitions();
let errors: Vec<_> = (0..n)
.into_par_iter()
.filter_map(|i| {
let index_dir = self.partition.part_dir(i).join("index");
if !index_dir.exists() { return None; }
let meta = match PartitionMeta::load(&index_dir) {
Ok(m) => m,
Err(e) => return Some(OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
};
for l in 0..meta.n_layers {
let layer_dir = index_dir.join(format!("layer_{l}"));
let meta_path = layer_dir.join(LayerMeta::FILENAME);
if meta_path.exists() { continue; }
let unitigs_path = layer_dir.join("unitigs.bin");
let n_kmers = match UnitigFileReader::open_sequential(&unitigs_path) {
Ok(r) => r.n_kmers(),
Err(e) => return Some(OKIError::Partition(e)),
};
if let Err(e) = LayerMeta::save(&layer_dir, n_kmers) {
return Some(OKIError::Io(e));
}
}
None
})
.collect();
if let Some(e) = errors.into_iter().next() { return Err(e); }
Ok(())
}
} }
fn label_from_path(path: &Path) -> String { fn label_from_path(path: &Path) -> String {
+47 -1
View File
@@ -63,8 +63,36 @@ impl KmerIndex {
} }
} }
// ── Choose base: largest source in the output evidence mode ─────────── // ── Log source characteristics and choose base ────────────────────────
let mode_str = if mode == MergeMode::Presence { "presence" } else { "count" };
info!(
"merge: {} source(s), smer-size={}, mode={}",
sources.len(), sources[0].kmer_size(), mode_str,
);
for (i, src) in sources.iter().enumerate() {
let genome_str = if src.meta.genomes.len() == 1 { "mono-genome".to_string() }
else { format!("{} genomes", src.meta.genomes.len()) };
let trivial_str = if is_trivial(src, mode) { " [trivial: no data approximation]" } else { "" };
info!(
" [{}] {} — {}, {}, {}{}",
i, src.root_path.display(),
format_evidence(&src.meta.config.evidence),
genome_str, mode_str, trivial_str,
);
}
let base_idx = choose_base(sources, mode); let base_idx = choose_base(sources, mode);
let needs_approx = sources.iter().any(|src| {
!is_trivial(src, mode)
&& matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. })
});
info!(
"output evidence: {} ({}base: [{}] {})",
format_evidence(&sources[base_idx].meta.config.evidence),
if needs_approx { "forced approx — " } else { "" },
base_idx, sources[base_idx].root_path.display(),
);
let mut ordered: Vec<&KmerIndex> = Vec::with_capacity(sources.len()); let mut ordered: Vec<&KmerIndex> = Vec::with_capacity(sources.len());
ordered.push(sources[base_idx]); ordered.push(sources[base_idx]);
for (i, &src) in sources.iter().enumerate() { for (i, &src) in sources.iter().enumerate() {
@@ -168,6 +196,16 @@ impl KmerIndex {
rep.push(t.stop()); rep.push(t.stop());
} }
// ── Pack matrices after merge ─────────────────────────────────────────
{
let t = Stage::start("pack");
let pb = spinner("pack — consolidating column files …");
let dst2 = KmerIndex::open(output)?;
dst2.pack_matrices()?;
pb.finish_and_clear();
rep.push(t.stop());
}
// Re-open to get the updated state. // Re-open to get the updated state.
KmerIndex::open(output) KmerIndex::open(output)
} }
@@ -272,6 +310,14 @@ fn partition_bar(n: u64) -> ProgressBar {
pb pb
} }
fn format_evidence(ev: &IndexMode) -> String {
match ev {
IndexMode::Exact => "exact".to_string(),
IndexMode::Approx { b, z } => format!("approx (b={b}, z={z})"),
IndexMode::Hybrid { b, z } => format!("hybrid (b={b}, z={z})"),
}
}
/// A source is "trivial" if its presence/count values carry no approximation: /// A source is "trivial" if its presence/count values carry no approximation:
/// single-genome presence index (SetMembership — all values are 1 by construction). /// single-genome presence index (SetMembership — all values are 1 by construction).
fn is_trivial(src: &KmerIndex, mode: MergeMode) -> bool { fn is_trivial(src: &KmerIndex, mode: MergeMode) -> bool {
+21
View File
@@ -0,0 +1,21 @@
use std::sync::OnceLock;
use indicatif::MultiProgress;
static MULTI: OnceLock<MultiProgress> = OnceLock::new();
/// Initialise the shared progress display. Call once from the binary before
/// any index operation. Subsequent calls are silently ignored.
pub fn init(multi: MultiProgress) {
let _ = MULTI.set(multi);
}
/// Return the shared `MultiProgress`, creating a plain default one if the
/// binary never called [`init`].
pub fn get() -> &'static MultiProgress {
MULTI.get_or_init(MultiProgress::new)
}
pub(crate) fn multi() -> &'static MultiProgress {
get()
}
-3
View File
@@ -4,7 +4,6 @@ use std::path::PathBuf;
use clap::Args; use clap::Args;
use kodama::{Method, linkage}; use kodama::{Method, linkage};
use obikindex::{DistanceMetric, KmerIndex}; use obikindex::{DistanceMetric, KmerIndex};
use obikseq::{set_k, set_m};
use speedytree::{DistanceMatrix, Hybrid, NeighborJoiningSolver, to_newick}; use speedytree::{DistanceMatrix, Hybrid, NeighborJoiningSolver, to_newick};
use tracing::info; use tracing::info;
@@ -72,8 +71,6 @@ pub fn run(args: DistanceArgs) {
std::process::exit(1); std::process::exit(1);
}); });
set_k(idx.kmer_size());
set_m(idx.minimizer_size());
let labels: Vec<String> = idx.meta().genomes.iter().map(|g| g.label.clone()).collect(); let labels: Vec<String> = idx.meta().genomes.iter().map(|g| g.label.clone()).collect();
let n = labels.len(); let n = labels.len();
-2
View File
@@ -3,7 +3,6 @@ use std::path::PathBuf;
use clap::Args; use clap::Args;
use obikindex::KmerIndex; use obikindex::KmerIndex;
use obikseq::set_k;
use tracing::info; use tracing::info;
#[derive(Args)] #[derive(Args)]
@@ -26,7 +25,6 @@ pub fn run(args: DumpArgs) {
std::process::exit(1); std::process::exit(1);
}); });
set_k(idx.kmer_size());
info!( info!(
"dumping {} partitions, {} genome(s)", "dumping {} partitions, {} genome(s)",
idx.n_partitions(), idx.n_partitions(),
-3
View File
@@ -8,7 +8,6 @@ fn parse_key_value(s: &str) -> Result<(String, String), String> {
let pos = s.find('=').ok_or_else(|| format!("invalid key=value: no '=' in '{s}'"))?; let pos = s.find('=').ok_or_else(|| format!("invalid key=value: no '=' in '{s}'"))?;
Ok((s[..pos].to_string(), s[pos + 1..].to_string())) Ok((s[..pos].to_string(), s[pos + 1..].to_string()))
} }
use obikseq::{set_k, set_m};
use obisys::Reporter; use obisys::Reporter;
use tracing::info; use tracing::info;
@@ -222,8 +221,6 @@ pub fn run(args: IndexArgs) {
}) })
}; };
set_k(idx.kmer_size());
set_m(idx.minimizer_size());
// ── Stage 1: scatter ───────────────────────────────────────────────────── // ── Stage 1: scatter ─────────────────────────────────────────────────────
if idx.state() < IndexState::Scattered { if idx.state() < IndexState::Scattered {
-3
View File
@@ -2,7 +2,6 @@ use std::path::PathBuf;
use clap::Args; use clap::Args;
use obikindex::{KmerIndex, MergeMode}; use obikindex::{KmerIndex, MergeMode};
use obikseq::{set_k, set_m};
use obisys::Reporter; use obisys::Reporter;
use tracing::info; use tracing::info;
@@ -53,8 +52,6 @@ pub fn run(args: MergeArgs) {
let source_refs: Vec<&KmerIndex> = sources.iter().collect(); let source_refs: Vec<&KmerIndex> = sources.iter().collect();
set_k(sources[0].kmer_size());
set_m(sources[0].minimizer_size());
let n_genomes: usize = sources.iter().map(|s| s.meta().genomes.len()).sum(); let n_genomes: usize = sources.iter().map(|s| s.meta().genomes.len()).sum();
info!( info!(
+1
View File
@@ -1,4 +1,5 @@
pub mod annotate; pub mod annotate;
pub mod pack;
pub mod utils; pub mod utils;
pub mod distance; pub mod distance;
pub mod dump; pub mod dump;
+36
View File
@@ -0,0 +1,36 @@
use std::path::PathBuf;
use clap::Args;
use obikindex::KmerIndex;
use obisys::{Reporter, Stage};
use tracing::info;
#[derive(Args)]
pub struct PackArgs {
/// Index directory to pack
pub index: PathBuf,
}
pub fn run(args: PackArgs) {
let idx = KmerIndex::open(&args.index).unwrap_or_else(|e| {
eprintln!("error opening index: {e}");
std::process::exit(1);
});
info!(
"pack: {} partition(s), {} genome(s)",
idx.n_partitions(),
idx.meta().genomes.len(),
);
let mut rep = Reporter::new();
let t = Stage::start("pack");
idx.pack_matrices().unwrap_or_else(|e| {
eprintln!("pack error: {e}");
std::process::exit(1);
});
rep.push(t.stop());
rep.print();
}
+239 -122
View File
@@ -5,12 +5,13 @@ use std::sync::Arc;
use clap::Args; use clap::Args;
use obikindex::KmerIndex; use obikindex::KmerIndex;
use obilayeredmap::IndexMode;
use obiread::chunk::read_sequence_chunks;
use obiread::record::{SeqRecord, parse_chunk};
use obikrope::Rope; use obikrope::Rope;
use obikseq::{RoutableSuperKmer, set_k, set_m}; use obikseq::RoutableSuperKmer;
use obilayeredmap::IndexMode;
use obiread::chunk::read_sequence_chunks_sized;
use obiread::record::{SeqRecord, parse_chunk};
use obiskbuilder::SuperKmerIter; use obiskbuilder::SuperKmerIter;
use obisys::available_memory_bytes;
use tracing::info; use tracing::info;
// ── Pipeline data ───────────────────────────────────────────────────────────── // ── Pipeline data ─────────────────────────────────────────────────────────────
@@ -69,6 +70,10 @@ pub struct QueryArgs {
.unwrap_or(1) .unwrap_or(1)
)] )]
pub threads: usize, pub threads: usize,
/// I/O chunk size in MiB (default: auto-sized from available RAM and thread count)
#[arg(long)]
pub chunk_size: Option<usize>,
} }
// ── SKDesc — one occurrence of a superkmer in the batch ─────────────────────── // ── SKDesc — one occurrence of a superkmer in the batch ───────────────────────
@@ -97,25 +102,24 @@ pub struct QueryBatch {
impl QueryBatch { impl QueryBatch {
/// Build a batch from a vec of parsed sequence records. /// Build a batch from a vec of parsed sequence records.
pub fn from_records( pub fn from_records(records: Vec<SeqRecord>, k: usize, level_max: usize, theta: f64) -> Self {
records: Vec<SeqRecord>,
k: usize,
level_max: usize,
theta: f64,
) -> Self {
let mut ids = Vec::with_capacity(records.len()); let mut ids = Vec::with_capacity(records.len());
let mut seqs = Vec::with_capacity(records.len()); let mut seqs = Vec::with_capacity(records.len());
let mut n_kmers = Vec::with_capacity(records.len()); let mut n_kmers = Vec::with_capacity(records.len());
let mut map: HashMap<RoutableSuperKmer, Vec<SKDesc>> = HashMap::new(); // Upper-bound estimate: at most one superkmer per k bases.
// Avoids repeated rehash on large chunks.
let cap = records.iter().map(|r| r.normalized.len()).sum::<usize>() / k.max(1);
let mut map: HashMap<RoutableSuperKmer, Vec<SKDesc>> = HashMap::with_capacity(cap);
for (seq_idx, record) in records.into_iter().enumerate() { for (seq_idx, record) in records.into_iter().enumerate() {
let mut kmer_offset = 0u32; let mut kmer_offset = 0u32;
for rsk in SuperKmerIter::new(&record.normalized, k, level_max, theta) { for rsk in SuperKmerIter::new(&record.normalized, k, level_max, theta) {
let n = (rsk.seql() - k + 1) as u32; let n = (rsk.seql() - k + 1) as u32;
map.entry(rsk) map.entry(rsk).or_default().push(SKDesc {
.or_default() seq_idx: seq_idx as u32,
.push(SKDesc { seq_idx: seq_idx as u32, kmer_offset }); kmer_offset,
});
kmer_offset += n; kmer_offset += n;
} }
@@ -124,7 +128,12 @@ impl QueryBatch {
n_kmers.push(kmer_offset); n_kmers.push(kmer_offset);
} }
Self { ids, seqs, n_kmers, map } Self {
ids,
seqs,
n_kmers,
map,
}
} }
/// Split the superkmer map by partition index. /// Split the superkmer map by partition index.
@@ -139,6 +148,59 @@ impl QueryBatch {
} }
} }
// ── KmerResults — allocation-free ragged result matrix ───────────────────────
/// Flat storage for per-kmer query results across all sequences in a chunk.
///
/// Replaces `Vec<Vec<Option<Box<[u32]>>>>` — a single allocation for the whole
/// chunk instead of one `Box<[u32]>` per found k-mer.
struct KmerResults {
data: Vec<u32>, // total_kmers × n_genomes, row-major
in_index: Vec<bool>, // total_kmers — true if the kmer was found in the index
offsets: Vec<usize>, // offsets[i]..offsets[i+1] = kmer range for sequence i
n_genomes: usize,
}
impl KmerResults {
fn new(n_kmers_per_seq: &[u32], n_genomes: usize) -> Self {
let mut offsets = Vec::with_capacity(n_kmers_per_seq.len() + 1);
let mut total = 0usize;
offsets.push(0);
for &n in n_kmers_per_seq {
total += n as usize;
offsets.push(total);
}
Self {
data: vec![0u32; total * n_genomes],
in_index: vec![false; total],
offsets,
n_genomes,
}
}
fn n_kmers_for(&self, seq: usize) -> usize {
self.offsets[seq + 1] - self.offsets[seq]
}
fn set(&mut self, seq: usize, kmer: usize, row: &[u32]) {
let abs = self.offsets[seq] + kmer;
self.in_index[abs] = true;
let base = abs * self.n_genomes;
self.data[base..base + self.n_genomes].copy_from_slice(row);
}
#[inline]
fn is_in_index(&self, seq: usize, kmer: usize) -> bool {
self.in_index[self.offsets[seq] + kmer]
}
/// Value for genome `g` at (seq, kmer); meaningful only when `is_in_index`.
#[inline]
fn val(&self, seq: usize, kmer: usize, g: usize) -> u32 {
self.data[(self.offsets[seq] + kmer) * self.n_genomes + g]
}
}
// ── Per-sequence accumulator ────────────────────────────────────────────────── // ── Per-sequence accumulator ──────────────────────────────────────────────────
struct SeqAcc { struct SeqAcc {
@@ -157,56 +219,6 @@ impl SeqAcc {
} }
} }
// ── Findere z-window filter ───────────────────────────────────────────────────
/// Apply the Findere z-window filter to per-kmer query results for one superkmer.
/// Aggregate s-mer query results into k-mer answers using a Findere z-window.
///
/// Input: N s-mer results (indexed kmer size s = k z + 1).
/// Output: N z + 1 k-mer results (user kmer size k).
///
/// For each genome g independently: k-mer at position i is confirmed iff all z values
/// results[i..i+z][g] are nonzero (None counts as zero for all genomes).
/// Output values are taken from results[i]; genomes not confirmed are zeroed.
fn apply_findere(
results: &[Option<Box<[u32]>>],
z: usize,
n_genomes: usize,
) -> Vec<Option<Box<[u32]>>> {
let n = results.len();
if z <= 1 {
return results.iter().map(|r| r.as_ref().map(|row| row.clone())).collect();
}
if n < z {
return Vec::new();
}
let out_n = n - z + 1;
let mut confirmed = vec![vec![false; n_genomes]; out_n];
for g in 0..n_genomes {
let hit = |i: usize| results[i].as_ref().map_or(false, |r| r[g] > 0);
let mut count: u32 = (0..z).filter(|&j| hit(j)).count() as u32;
if count == z as u32 { confirmed[0][g] = true; }
for i in 1..out_n {
if hit(i - 1) { count -= 1; }
if hit(i + z - 1) { count += 1; }
if count == z as u32 { confirmed[i][g] = true; }
}
}
(0..out_n).map(|i| {
let first = results[i].as_ref()?;
let mut row: Box<[u32]> = first.clone();
for g in 0..n_genomes {
if !confirmed[i][g] { row[g] = 0; }
}
if row.iter().any(|&v| v > 0) { Some(row) } else { None }
}).collect()
}
// ── process_chunk ───────────────────────────────────────────────────────────── // ── process_chunk ─────────────────────────────────────────────────────────────
fn process_chunk( fn process_chunk(
@@ -230,11 +242,8 @@ fn process_chunk(
let batch = QueryBatch::from_records(records, k, 6, 0.7); let batch = QueryBatch::from_records(records, k, 6, 0.7);
let n_seqs = batch.ids.len(); let n_seqs = batch.ids.len();
// Per-sequence s-mer result vectors in global sequence position order. // Flat result matrix — one allocation for the whole chunk.
// All partitions fill into this structure before Findere is applied. let mut results = KmerResults::new(&batch.n_kmers, n_genomes);
let mut seq_results: Vec<Vec<Option<Box<[u32]>>>> = batch.n_kmers.iter()
.map(|&n| vec![None; n as usize])
.collect();
let by_part = batch.split_by_partition(n_partitions); let by_part = batch.split_by_partition(n_partitions);
@@ -243,36 +252,54 @@ fn process_chunk(
continue; continue;
} }
let kmer_results = idx idx.partition()
.partition() .query_partition_with(
.query_partition(part_idx, part_sks, k, n_genomes, with_counts) part_idx,
part_sks,
k,
n_genomes,
with_counts,
|sk_idx, kmer_idx, row| {
let rsk = part_sks[sk_idx];
let descs = batch.map.get(rsk).expect("rsk must be in map");
for desc in descs {
results.set(
desc.seq_idx as usize,
desc.kmer_offset as usize + kmer_idx,
row,
);
}
},
)
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
eprintln!("query error on partition {part_idx}: {e}"); eprintln!("query error on partition {part_idx}: {e}");
std::process::exit(1); std::process::exit(1);
}); });
for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) {
let descs = batch.map.get(*rsk).expect("rsk must be in map");
for desc in descs {
let offset = desc.kmer_offset as usize;
let dst = &mut seq_results[desc.seq_idx as usize];
for (j, hit) in sk_kmer_results.iter().enumerate() {
dst[offset + j] = hit.as_ref().map(|r| r.clone());
}
}
}
} }
// Apply Findere on each complete sequence vector, then accumulate. // Findere z-window filter + accumulation — no intermediate allocations.
let n_kmers_out: Vec<usize> = batch.n_kmers.iter() // One `confirmed` buffer reused across all sequences.
.map(|&n| { let n = n as usize; if n >= effective_z { n - effective_z + 1 } else { 0 } }) let max_n_kmers = batch.n_kmers.iter().map(|&n| n as usize).max().unwrap_or(0);
let mut confirmed = vec![false; max_n_kmers * n_genomes];
let mut accs: Vec<SeqAcc> = (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect();
let n_kmers_out: Vec<usize> = batch
.n_kmers
.iter()
.map(|&n| {
let n = n as usize;
if n >= effective_z {
n - effective_z + 1
} else {
0
}
})
.collect(); .collect();
let mut accs: Vec<SeqAcc> =
(0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect();
let mut cov: Vec<Vec<Vec<u32>>> = if detail { let mut cov: Vec<Vec<Vec<u32>>> = if detail {
n_kmers_out.iter() n_kmers_out
.iter()
.map(|&n| vec![vec![0u32; n]; n_genomes]) .map(|&n| vec![vec![0u32; n]; n_genomes])
.collect() .collect()
} else { } else {
@@ -281,39 +308,114 @@ fn process_chunk(
let presence = force_presence || !with_counts; let presence = force_presence || !with_counts;
let threshold = presence_threshold; let threshold = presence_threshold;
let z = effective_z;
for seq_idx in 0..n_seqs { for seq_idx in 0..n_seqs {
let filtered = apply_findere(&seq_results[seq_idx], effective_z, n_genomes); let n = results.n_kmers_for(seq_idx);
let acc = &mut accs[seq_idx]; let out_n = n_kmers_out[seq_idx];
if out_n == 0 {
continue;
}
for (pos, hit) in filtered.iter().enumerate() { if z <= 1 {
match hit { // No Findere — every indexed s-mer is confirmed.
None => { let acc = &mut accs[seq_idx];
if seq_results[seq_idx][pos].is_none() { for pos in 0..n {
if !results.is_in_index(seq_idx, pos) {
acc.kmer_missing += 1; acc.kmer_missing += 1;
continue;
} }
}
Some(row) => {
acc.kmer_count += 1; acc.kmer_count += 1;
for (g, &v) in row.iter().enumerate() { for g in 0..n_genomes {
if v == 0 { continue; } let v = results.val(seq_idx, pos, g);
let contribution = if presence { if v == 0 {
continue;
}
let c = if presence {
u32::from(v >= threshold) u32::from(v >= threshold)
} else { } else {
v v
}; };
acc.genome_totals[g] += contribution; acc.genome_totals[g] += c;
if detail { if detail {
cov[seq_idx][g][pos] += contribution; cov[seq_idx][g][pos] += c;
} }
} }
} }
} else {
// Build confirmed[pos * n_genomes + g] via sliding window.
let conf = &mut confirmed[..out_n * n_genomes];
conf.fill(false);
for g in 0..n_genomes {
let hit =
|i: usize| results.is_in_index(seq_idx, i) && results.val(seq_idx, i, g) > 0;
let mut cnt: u32 = (0..z).filter(|&j| hit(j)).count() as u32;
if cnt == z as u32 {
conf[g] = true;
}
for i in 1..out_n {
if hit(i - 1) {
cnt -= 1;
}
if hit(i + z - 1) {
cnt += 1;
}
if cnt == z as u32 {
conf[i * n_genomes + g] = true;
}
}
}
let acc = &mut accs[seq_idx];
for pos in 0..out_n {
let any = (0..n_genomes).any(|g| conf[pos * n_genomes + g]);
if !any {
if !results.is_in_index(seq_idx, pos) {
acc.kmer_missing += 1;
}
continue;
}
acc.kmer_count += 1;
for g in 0..n_genomes {
if !conf[pos * n_genomes + g] {
continue;
}
let v = results.val(seq_idx, pos, g);
if v == 0 {
continue;
}
let c = if presence {
u32::from(v >= threshold)
} else {
v
};
acc.genome_totals[g] += c;
if detail {
cov[seq_idx][g][pos] += c;
}
}
} }
} }
} }
let mut buf = Vec::new(); // Capacity estimate: actual sequence + ID bytes, plus JSON overhead per record.
emit_batch(&batch, &accs, idx.meta(), count_missing, detail, &cov, &mut buf); // JSON per record ≈ 50 fixed chars + ~20 per genome (label + count value) + 100 (overhead).
let seq_bytes: usize = batch.seqs.iter().map(|s| s.len()).sum();
let id_bytes: usize = batch.ids.iter().map(|s| s.len()).sum();
let cap = seq_bytes + id_bytes + n_seqs * (4 + 50 + n_genomes * 20) + 100;
let mut buf = Vec::with_capacity(cap);
emit_batch(
&batch,
&accs,
idx.meta(),
count_missing,
detail,
&cov,
&mut buf,
);
buf buf
} }
@@ -325,20 +427,29 @@ pub fn run(args: QueryArgs) {
std::process::exit(1); std::process::exit(1);
})); }));
set_k(idx.kmer_size());
set_m(idx.minimizer_size());
let k = idx.kmer_size(); let k = idx.kmer_size();
let n_genomes = idx.meta().genomes.len(); let n_genomes = idx.meta().genomes.len();
let n_partitions = idx.n_partitions(); let n_partitions = idx.n_partitions();
let with_counts = idx.meta().config.with_counts; let with_counts = idx.meta().config.with_counts;
let n_workers = args.threads.max(1); let n_workers = args.threads.max(1);
let effective_z: usize = args.findere_z.unwrap_or_else(|| { // Chunk size: each chunk stays in memory for its entire processing lifetime.
match idx.meta().config.evidence { // Overhead per raw byte is ~8× (Rope + parsed records + superkmers + results).
// We target ≤ 50 % of available RAM across all concurrent workers.
let chunk_bytes = args
.chunk_size
.map(|mb| mb * 1024 * 1024)
.unwrap_or_else(|| {
let avail = available_memory_bytes();
let computed = avail / (n_workers as u64 * 16);
computed.clamp(4 * 1024 * 1024, 256 * 1024 * 1024) as usize
});
let effective_z: usize = args
.findere_z
.unwrap_or_else(|| match idx.meta().config.evidence {
IndexMode::Approx { z, .. } | IndexMode::Hybrid { z, .. } => z as usize, IndexMode::Approx { z, .. } | IndexMode::Hybrid { z, .. } => z as usize,
IndexMode::Exact => 1, IndexMode::Exact => 1,
}
}); });
info!( info!(
@@ -358,13 +469,18 @@ pub fn run(args: QueryArgs) {
// Flat iterator over all Rope chunks from all input files. // Flat iterator over all Rope chunks from all input files.
// I/O runs in the source thread; chunk processing is parallelised by the pipe. // I/O runs in the source thread; chunk processing is parallelised by the pipe.
info!("query: chunk_size={}MiB", chunk_bytes / (1024 * 1024));
let paths: Vec<PathBuf> = args.inputs.iter().map(PathBuf::from).collect(); let paths: Vec<PathBuf> = args.inputs.iter().map(PathBuf::from).collect();
let all_chunks = paths.into_iter().flat_map(|path| { let all_chunks = paths.into_iter().flat_map(move |path| {
let path_str = path.to_str().unwrap_or("").to_owned(); let path_str = path.to_str().unwrap_or("").to_owned();
match read_sequence_chunks(&path_str) { match read_sequence_chunks_sized(&path_str, chunk_bytes) {
Ok(iter) => Box::new(iter.filter_map(|r| match r { Ok(iter) => Box::new(iter.filter_map(|r| match r {
Ok(rope) => Some(rope), Ok(rope) => Some(rope),
Err(e) => { eprintln!("read error: {e}"); None } Err(e) => {
eprintln!("read error: {e}");
None
}
})) as Box<dyn Iterator<Item = Rope> + Send>, })) as Box<dyn Iterator<Item = Rope> + Send>,
Err(e) => { Err(e) => {
eprintln!("error opening {path_str}: {e}"); eprintln!("error opening {path_str}: {e}");
@@ -379,8 +495,8 @@ pub fn run(args: QueryArgs) {
let idx = Arc::clone(&idx); let idx = Arc::clone(&idx);
move |rope: Rope| { move |rope: Rope| {
process_chunk( process_chunk(
&idx, rope, k, n_genomes, n_partitions, with_counts, effective_z, &idx, rope, k, n_genomes, n_partitions, with_counts,
detail, count_missing, force_presence, presence_threshold, effective_z, detail, count_missing, force_presence, presence_threshold,
) )
} }
} : Chunk => Output, } : Chunk => Output,
@@ -424,17 +540,18 @@ fn emit_batch(
if detail && !cov.is_empty() { if detail && !cov.is_empty() {
let mut cov_map = serde_json::Map::new(); let mut cov_map = serde_json::Map::new();
for (g, genome) in meta.genomes.iter().enumerate() { for (g, genome) in meta.genomes.iter().enumerate() {
let v: Vec<serde_json::Value> = let v: Vec<serde_json::Value> = cov[seq_idx][g].iter().map(|&x| x.into()).collect();
cov[seq_idx][g].iter().map(|&x| x.into()).collect();
cov_map.insert(genome.label.clone(), v.into()); cov_map.insert(genome.label.clone(), v.into());
} }
ann.insert("coverage".into(), cov_map.into()); ann.insert("coverage".into(), cov_map.into());
} }
let ann_str = serde_json::to_string(&ann).unwrap_or_else(|_| "{}".to_string());
// OBITools4 FASTA format: >id {"key":value,...} // OBITools4 FASTA format: >id {"key":value,...}
let _ = writeln!(out, ">{id} {ann_str}"); let _ = out.write_all(b">");
let _ = out.write_all(id.as_bytes());
let _ = out.write_all(b" ");
let _ = serde_json::to_writer(&mut *out, &ann);
let _ = out.write_all(b"\n");
let _ = out.write_all(seq); let _ = out.write_all(seq);
let _ = out.write_all(b"\n"); let _ = out.write_all(b"\n");
} }
-3
View File
@@ -7,7 +7,6 @@ use obikpartitionner::filter::{
MinGenomeCount, MinGenomeFraction, MinTotalCount, MinGenomeCount, MinGenomeFraction, MinTotalCount,
}; };
use obisys::Reporter; use obisys::Reporter;
use obikseq::{set_k, set_m};
use tracing::info; use tracing::info;
#[derive(Args)] #[derive(Args)]
@@ -62,8 +61,6 @@ pub fn run(args: RebuildArgs) {
std::process::exit(1); std::process::exit(1);
}); });
set_k(src.kmer_size());
set_m(src.minimizer_size());
let n_genomes = src.meta().genomes.len(); let n_genomes = src.meta().genomes.len();
let mode = if args.presence || !src.meta().config.with_counts { let mode = if args.presence || !src.meta().config.with_counts {
+22 -1
View File
@@ -12,6 +12,10 @@ pub struct UtilsArgs {
/// Set a new genome label: NEW_LABEL=OLD_LABEL /// Set a new genome label: NEW_LABEL=OLD_LABEL
#[arg(long, value_name = "NEW=OLD")] #[arg(long, value_name = "NEW=OLD")]
pub new_label: Option<String>, pub new_label: Option<String>,
/// Add missing layer_meta.json files to each layer (required after upgrading from old indexes)
#[arg(long)]
pub upgrade_index: bool,
} }
pub fn run(args: UtilsArgs) { pub fn run(args: UtilsArgs) {
@@ -22,12 +26,29 @@ pub fn run(args: UtilsArgs) {
run_rename(&args.index, spec); run_rename(&args.index, spec);
} }
if args.upgrade_index {
any = true;
run_upgrade_index(&args.index);
}
if !any { if !any {
eprintln!("utils: no operation specified. Available options: --new-label NEW=OLD"); eprintln!("utils: no operation specified. Available options: --new-label NEW=OLD, --upgrade-index");
std::process::exit(1); std::process::exit(1);
} }
} }
fn run_upgrade_index(index_path: &PathBuf) {
let idx = KmerIndex::open(index_path).unwrap_or_else(|e| {
eprintln!("error opening index: {e}");
std::process::exit(1);
});
idx.upgrade_layer_meta().unwrap_or_else(|e| {
eprintln!("upgrade error: {e}");
std::process::exit(1);
});
info!("upgrade-index: layer_meta.json written to all layers that were missing it");
}
fn run_rename(index_path: &PathBuf, spec: &str) { fn run_rename(index_path: &PathBuf, spec: &str) {
let (old_label, new_label) = parse_rename_spec(spec); let (old_label, new_label) = parse_rename_spec(spec);
+3
View File
@@ -38,6 +38,8 @@ enum Commands {
Reindex(cmd::reindex::ReindexArgs), Reindex(cmd::reindex::ReindexArgs),
/// Miscellaneous index utilities (--rename, …) /// Miscellaneous index utilities (--rename, …)
Utils(cmd::utils::UtilsArgs), Utils(cmd::utils::UtilsArgs),
/// Pack matrix column files into single-file format to reduce query I/O
Pack(cmd::pack::PackArgs),
} }
fn main() { fn main() {
@@ -71,6 +73,7 @@ fn main() {
Commands::Estimate(args) => cmd::estimate::run(args), Commands::Estimate(args) => cmd::estimate::run(args),
Commands::Reindex(args) => cmd::reindex::run(args), Commands::Reindex(args) => cmd::reindex::run(args),
Commands::Utils(args) => cmd::utils::run(args), Commands::Utils(args) => cmd::utils::run(args),
Commands::Pack(args) => cmd::pack::run(args),
} }
#[cfg(feature = "profiling")] #[cfg(feature = "profiling")]
+4 -1
View File
@@ -95,10 +95,13 @@ pub fn scatter(
let mut ema_rate: f64 = 0.0; let mut ema_rate: f64 = 0.0;
let mut last_t = Instant::now(); let mut last_t = Instant::now();
let mut last_bases: u64 = 0; let mut last_bases: u64 = 0;
let kmer_overlap = (k - 1) as u64;
const ALPHA: f64 = 0.15; const ALPHA: f64 = 0.15;
for batch in pipe.apply(throttled, n_workers, 1) { for batch in pipe.apply(throttled, n_workers, 1) {
total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::<u64>(); total_bases += batch.iter()
.map(|sk| (sk.seql() as u64).saturating_sub(kmer_overlap))
.sum::<u64>();
let now = Instant::now(); let now = Instant::now();
let dt = now.duration_since(last_t).as_secs_f64(); let dt = now.duration_since(last_t).as_secs_f64();
if dt > 0.1 { if dt > 0.1 {
+6 -4
View File
@@ -22,8 +22,9 @@ impl KmerPartition {
} }
let matrices = (0..probe_n_layers(&index_dir)) let matrices = (0..probe_n_layers(&index_dir))
.filter_map(|l| { .filter_map(|l| {
let dir = index_dir.join(format!("layer_{l}")).join("counts"); let layer_dir = index_dir.join(format!("layer_{l}"));
dir.exists().then(|| PersistentCompactIntMatrix::open(&dir).map_err(SKError::Io)) layer_dir.join("counts").exists()
.then(|| PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io))
}) })
.collect::<SKResult<Vec<_>>>()?; .collect::<SKResult<Vec<_>>>()?;
Ok(LayeredStore::new(matrices)) Ok(LayeredStore::new(matrices))
@@ -38,8 +39,9 @@ impl KmerPartition {
} }
let matrices = (0..probe_n_layers(&index_dir)) let matrices = (0..probe_n_layers(&index_dir))
.filter_map(|l| { .filter_map(|l| {
let dir = index_dir.join(format!("layer_{l}")).join("presence"); let layer_dir = index_dir.join(format!("layer_{l}"));
dir.exists().then(|| PersistentBitMatrix::open(&dir).map_err(SKError::Io)) layer_dir.join("presence").exists()
.then(|| PersistentBitMatrix::open(&layer_dir).map_err(SKError::Io))
}) })
.collect::<SKResult<Vec<_>>>()?; .collect::<SKResult<Vec<_>>>()?;
Ok(LayeredStore::new(matrices)) Ok(LayeredStore::new(matrices))
+4 -4
View File
@@ -51,14 +51,14 @@ impl KmerPartition {
let presence_dir = layer_dir.join("presence"); let presence_dir = layer_dir.join("presence");
if use_counts && counts_dir.exists() { if use_counts && counts_dir.exists() {
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
cb(kmer, mat.row(slot)); cb(kmer, mat.row(slot));
} }
} }
} else if !use_counts && presence_dir.exists() { } else if !use_counts && presence_dir.exists() {
let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; let mat = PersistentBitMatrix::open(&layer_dir).map_err(SKError::Io)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect();
@@ -108,14 +108,14 @@ impl KmerPartition {
let presence_dir = layer_dir.join("presence"); let presence_dir = layer_dir.join("presence");
if use_counts && counts_dir.exists() { if use_counts && counts_dir.exists() {
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
cb(part, layer, kmer, mat.row(slot)); cb(part, layer, kmer, mat.row(slot));
} }
} }
} else if !use_counts && presence_dir.exists() { } else if !use_counts && presence_dir.exists() {
let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; let mat = PersistentBitMatrix::open(&layer_dir).map_err(SKError::Io)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect();
+15 -20
View File
@@ -45,37 +45,35 @@ impl ColBuilder {
// ── SrcLayerData — opened source matrix for pass-2 lookup ───────────────────── // ── SrcLayerData — opened source matrix for pass-2 lookup ─────────────────────
pub(crate) enum SrcLayerData { pub(crate) enum SrcLayerData {
/// Pure set-membership layer (no data matrix): every kmer is present in all genomes.
SetMembership,
Presence(MphfOnly, PersistentBitMatrix), Presence(MphfOnly, PersistentBitMatrix),
Count(MphfOnly, PersistentCompactIntMatrix), Count(MphfOnly, PersistentCompactIntMatrix),
} }
impl SrcLayerData { impl SrcLayerData {
pub(crate) fn open(layer_dir: &Path, merge_mode: MergeMode) -> SKResult<Self> { pub(crate) fn open(layer_dir: &Path, merge_mode: MergeMode) -> SKResult<Self> {
let presence_dir = layer_dir.join("presence");
let counts_dir = layer_dir.join("counts"); let counts_dir = layer_dir.join("counts");
match merge_mode { match merge_mode {
MergeMode::Presence => { MergeMode::Presence => {
if presence_dir.exists() { if counts_dir.exists() && !layer_dir.join("presence").exists() {
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?;
let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Presence(mphf, mat))
} else if counts_dir.exists() {
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?;
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Count(mphf, mat)) Ok(SrcLayerData::Count(mphf, mat))
} else { } else {
Ok(SrcLayerData::SetMembership) // presence dir exists, or neither exists → Implicit handled by open()
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?;
let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Presence(mphf, mat))
} }
} }
MergeMode::Count => { MergeMode::Count => {
if counts_dir.exists() {
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?;
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; if counts_dir.exists() {
let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Count(mphf, mat)) Ok(SrcLayerData::Count(mphf, mat))
} else { } else {
Ok(SrcLayerData::SetMembership) // No counts → treat as implicit presence (all 1s)
let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Presence(mphf, mat))
} }
} }
} }
@@ -85,15 +83,12 @@ impl SrcLayerData {
/// The caller guarantees `kmer` is in the source MPHF domain. /// The caller guarantees `kmer` is in the source MPHF domain.
#[inline] #[inline]
pub(crate) fn lookup(&self, kmer: CanonicalKmer, n_genomes: usize) -> Vec<u32> { pub(crate) fn lookup(&self, kmer: CanonicalKmer, n_genomes: usize) -> Vec<u32> {
let mut buf = vec![0u32; n_genomes];
match self { match self {
SrcLayerData::SetMembership => vec![1u32; n_genomes], SrcLayerData::Presence(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf),
SrcLayerData::Presence(mphf, mat) => { SrcLayerData::Count(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf),
mat.row(mphf.index(kmer)).iter().map(|&b| b as u32).collect()
}
SrcLayerData::Count(mphf, mat) => {
mat.row(mphf.index(kmer)).iter().copied().collect()
}
} }
buf
} }
} }
+82 -36
View File
@@ -20,70 +20,108 @@ fn olm_to_sk(e: OLMError) -> SKError {
// ── per-layer query handle ──────────────────────────────────────────────────── // ── per-layer query handle ────────────────────────────────────────────────────
enum QueryLayer { enum QueryLayer {
/// Layer<()> — MPHF-only, no data matrix; all indexed kmers map to 1 per genome.
SetOnly(MphfLayer),
Presence(MphfLayer, PersistentBitMatrix), Presence(MphfLayer, PersistentBitMatrix),
Count(MphfLayer, PersistentCompactIntMatrix), Count(MphfLayer, PersistentCompactIntMatrix),
} }
impl QueryLayer { impl QueryLayer {
fn open(layer_dir: &Path, with_counts: bool, mode: &IndexMode) -> SKResult<Self> { fn open(layer_dir: &Path, with_counts: bool, mode: &IndexMode) -> SKResult<Self> {
let presence_dir = layer_dir.join("presence"); let mphf = MphfLayer::open(layer_dir, mode).map_err(olm_to_sk)?;
let counts_dir = layer_dir.join("counts"); let counts_dir = layer_dir.join("counts");
let presence_dir = layer_dir.join("presence");
if with_counts && counts_dir.exists() { if with_counts && counts_dir.exists() {
let mphf = MphfLayer::open(layer_dir, mode).map_err(olm_to_sk)?; let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?;
Ok(QueryLayer::Count(mphf, mat)) Ok(QueryLayer::Count(mphf, mat))
} else if presence_dir.exists() { } else if presence_dir.exists() || !counts_dir.exists() {
let mphf = MphfLayer::open(layer_dir, mode).map_err(olm_to_sk)?; // presence mode, or no matrix at all → Implicit handled inside open()
let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(QueryLayer::Presence(mphf, mat)) Ok(QueryLayer::Presence(mphf, mat))
} else if counts_dir.exists() {
// presence query on a count index — return counts as-is
let mphf = MphfLayer::open(layer_dir, mode).map_err(olm_to_sk)?;
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?;
Ok(QueryLayer::Count(mphf, mat))
} else { } else {
let mphf = MphfLayer::open(layer_dir, mode).map_err(olm_to_sk)?; // counts exist but not presence — count layer, no presence requested
Ok(QueryLayer::SetOnly(mphf)) let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(QueryLayer::Count(mphf, mat))
} }
} }
/// Return `Some(per-genome row)` if `kmer` is indexed in this layer, else `None`. /// Write per-genome values into `buf` if `kmer` is indexed; returns true on hit.
fn find(&self, kmer: CanonicalKmer, n_genomes: usize) -> Option<Box<[u32]>> { fn find_into(&self, kmer: CanonicalKmer, n_genomes: usize, buf: &mut [u32]) -> bool {
match self { match self {
QueryLayer::SetOnly(mphf) => {
mphf.find(kmer)
.map(|_| vec![1u32; n_genomes].into_boxed_slice())
}
QueryLayer::Presence(mphf, mat) => { QueryLayer::Presence(mphf, mat) => {
mphf.find(kmer) if let Some(slot) = mphf.find(kmer) {
.map(|slot| mat.row(slot).iter().map(|&b| b as u32).collect()) mat.fill_row(slot, &mut buf[..n_genomes]);
true
} else {
false
}
} }
QueryLayer::Count(mphf, mat) => { QueryLayer::Count(mphf, mat) => {
mphf.find(kmer).map(|slot| mat.row(slot)) if let Some(slot) = mphf.find(kmer) {
mat.fill_row(slot, &mut buf[..n_genomes]);
true
} else {
false
}
} }
} }
} }
} }
// ── KmerPartition::query_partition ────────────────────────────────────────── // ── KmerPartition::query_partition* ──────────────────────────────────────────
impl KmerPartition { impl KmerPartition {
/// Query a single partition for a slice of (already-routed) super-kmers. /// Query a single partition, calling `on_hit(sk_idx, kmer_idx, row)` for
/// /// every found k-mer without allocating intermediate result vectors.
/// Returns one entry per input super-kmer; each entry is a `Vec` with one pub fn query_partition_with<F>(
/// `Option<Box<[u32]>>` per k-mer inside that super-kmer: &self,
/// - `None` — k-mer absent from the index part_idx: usize,
/// - `Some(row)` — per-genome count (count index) or 0/1 (presence index) superkmers: &[&RoutableSuperKmer],
/// _k: usize,
/// All `superkmers` must belong to this partition (same minimizer bucket). n_genomes: usize,
with_counts: bool,
mut on_hit: F,
) -> SKResult<()>
where
F: FnMut(usize, usize, &[u32]),
{
if superkmers.is_empty() {
return Ok(());
}
let index_dir = self.part_dir(part_idx).join(INDEX_SUBDIR);
if !index_dir.exists() {
return Ok(());
}
let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?;
let layers: Vec<QueryLayer> = (0..meta.n_layers)
.map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts, &meta.mode))
.collect::<SKResult<_>>()?;
let mut buf = vec![0u32; n_genomes];
for (sk_idx, rsk) in superkmers.iter().enumerate() {
for (kmer_idx, kmer) in rsk.superkmer().iter_canonical_kmers().enumerate() {
for layer in &layers {
if layer.find_into(kmer, n_genomes, &mut buf) {
on_hit(sk_idx, kmer_idx, &buf);
buf.fill(0);
break;
}
}
}
}
Ok(())
}
/// Query a single partition for a slice of super-kmers, returning per-kmer rows.
/// Prefer [`query_partition_with`] to avoid per-kmer heap allocations.
pub fn query_partition( pub fn query_partition(
&self, &self,
part_idx: usize, part_idx: usize,
superkmers: &[&RoutableSuperKmer], superkmers: &[&RoutableSuperKmer],
k: usize, _k: usize,
n_genomes: usize, n_genomes: usize,
with_counts: bool, with_counts: bool,
) -> SKResult<Vec<Vec<Option<Box<[u32]>>>>> { ) -> SKResult<Vec<Vec<Option<Box<[u32]>>>>> {
@@ -96,7 +134,7 @@ impl KmerPartition {
if !index_dir.exists() { if !index_dir.exists() {
return Ok(superkmers return Ok(superkmers
.iter() .iter()
.map(|rsk| vec![None; rsk.seql() - k + 1]) .map(|rsk| vec![None; rsk.seql()])
.collect()); .collect());
} }
@@ -105,13 +143,21 @@ impl KmerPartition {
.map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts, &meta.mode)) .map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts, &meta.mode))
.collect::<SKResult<_>>()?; .collect::<SKResult<_>>()?;
let mut buf = vec![0u32; n_genomes];
Ok(superkmers Ok(superkmers
.iter() .iter()
.map(|rsk| { .map(|rsk| {
rsk.superkmer() rsk.superkmer()
.iter_canonical_kmers() .iter_canonical_kmers()
.map(|kmer| { .map(|kmer| {
layers.iter().find_map(|layer| layer.find(kmer, n_genomes)) for layer in &layers {
if layer.find_into(kmer, n_genomes, &mut buf) {
let row: Box<[u32]> = buf[..n_genomes].into();
buf.fill(0);
return Some(row);
}
}
None
}) })
.collect() .collect()
}) })
+2 -2
View File
@@ -34,7 +34,7 @@ impl LayerData for () {
impl LayerData for PersistentCompactIntMatrix { impl LayerData for PersistentCompactIntMatrix {
type Item = Box<[u32]>; type Item = Box<[u32]>;
fn open(layer_dir: &Path) -> OLMResult<Self> { fn open(layer_dir: &Path) -> OLMResult<Self> {
PersistentCompactIntMatrix::open(&layer_dir.join(COUNTS_DIR)).map_err(OLMError::Io) PersistentCompactIntMatrix::open(layer_dir).map_err(OLMError::Io)
} }
fn read(&self, slot: usize) -> Box<[u32]> { self.row(slot) } fn read(&self, slot: usize) -> Box<[u32]> { self.row(slot) }
} }
@@ -42,7 +42,7 @@ impl LayerData for PersistentCompactIntMatrix {
impl LayerData for PersistentBitMatrix { impl LayerData for PersistentBitMatrix {
type Item = Box<[bool]>; type Item = Box<[bool]>;
fn open(layer_dir: &Path) -> OLMResult<Self> { fn open(layer_dir: &Path) -> OLMResult<Self> {
PersistentBitMatrix::open(&layer_dir.join(PRESENCE_DIR)).map_err(OLMError::Io) PersistentBitMatrix::open(layer_dir).map_err(OLMError::Io)
} }
fn read(&self, slot: usize) -> Box<[bool]> { self.row(slot) } fn read(&self, slot: usize) -> Box<[bool]> { self.row(slot) }
} }
+2 -2
View File
@@ -107,7 +107,7 @@ mod tests {
fn make_int_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) { fn make_int_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) {
let n = cols.first().map_or(0, |c| c.len()); let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut b = PersistentCompactIntMatrixBuilder::new(n, dir.path()).unwrap(); let mut b = PersistentCompactIntMatrixBuilder::new(n, &dir.path().join("counts")).unwrap();
for &col in cols { for &col in cols {
let mut cb = b.add_col().unwrap(); let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); } for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
@@ -121,7 +121,7 @@ mod tests {
fn make_bit_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) { fn make_bit_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) {
let n = cols.first().map_or(0, |c| c.len()); let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let mut b = PersistentBitMatrixBuilder::new(n, dir.path()).unwrap(); let mut b = PersistentBitMatrixBuilder::new(n, &dir.path().join("presence")).unwrap();
for &col in cols { for &col in cols {
let mut cb = b.add_col().unwrap(); let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); } for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
+11 -4
View File
@@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
use cacheline_ef::{CachelineEf, CachelineEfVec}; use cacheline_ef::{CachelineEf, CachelineEfVec};
use epserde::prelude::*; use epserde::prelude::*;
use obicompactvec::LayerMeta;
use obikseq::CanonicalKmer; use obikseq::CanonicalKmer;
use obiskio::{CanonicalKmerIter, UnitigFileReader, UnitigFileWriter, build_unitig_idx}; use obiskio::{CanonicalKmerIter, UnitigFileReader, UnitigFileWriter, build_unitig_idx};
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64}; use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
@@ -17,8 +18,13 @@ pub(crate) const UNITIGS_FILE: &str = "unitigs.bin";
pub(crate) const EVIDENCE_FILE: &str = "evidence.bin"; pub(crate) const EVIDENCE_FILE: &str = "evidence.bin";
pub(crate) const FINGERPRINT_FILE: &str = "fingerprint.bin"; pub(crate) const FINGERPRINT_FILE: &str = "fingerprint.bin";
/// Owned MPHF — used only at build time (construction + store).
pub(crate) type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>; pub(crate) type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
/// Zero-copy MPHF for querying — ε-deserialized view into a memory-mapped file.
/// `MemCase` owns the mmap backing; `'static` is sound because MemCase pins the memory.
type MphfEps = PtrHash<u64, CubicEps, CachelineEfVec<&'static [CachelineEf]>, Xx64, &'static [u8]>;
// ── LayerEvidence ───────────────────────────────────────────────────────────── // ── LayerEvidence ─────────────────────────────────────────────────────────────
enum LayerEvidence { enum LayerEvidence {
@@ -36,7 +42,7 @@ enum LayerEvidence {
/// - [`find_strict`](Self::find_strict) — always exact; O(1) on Exact/Hybrid layers, /// - [`find_strict`](Self::find_strict) — always exact; O(1) on Exact/Hybrid layers,
/// O(n) sequential scan on Approx layers. /// O(n) sequential scan on Approx layers.
pub struct MphfLayer { pub struct MphfLayer {
mphf: Mphf, mphf: MemCase<MphfEps>,
ev: LayerEvidence, ev: LayerEvidence,
n: usize, n: usize,
} }
@@ -45,7 +51,7 @@ impl MphfLayer {
/// Open a layer using the index-level `mode` determined at `LayeredMap` open time. /// Open a layer using the index-level `mode` determined at `LayeredMap` open time.
/// No per-layer metadata file is read. /// No per-layer metadata file is read.
pub fn open(dir: &Path, mode: &IndexMode) -> OLMResult<Self> { pub fn open(dir: &Path, mode: &IndexMode) -> OLMResult<Self> {
let mphf: Mphf = Mphf::load_full(&dir.join(MPHF_FILE)) let mphf: MemCase<MphfEps> = Mphf::mmap(&dir.join(MPHF_FILE), Flags::empty())
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?; .map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
let (ev, n) = match mode { let (ev, n) = match mode {
IndexMode::Exact => { IndexMode::Exact => {
@@ -137,11 +143,11 @@ impl MphfLayer {
/// ///
/// Use this when the caller guarantees that all queried kmers are in the MPHF /// Use this when the caller guarantees that all queried kmers are in the MPHF
/// domain (e.g. when iterating the source's own unitigs during merge). /// domain (e.g. when iterating the source's own unitigs during merge).
pub struct MphfOnly(Mphf); pub struct MphfOnly(MemCase<MphfEps>);
impl MphfOnly { impl MphfOnly {
pub fn open(dir: &Path) -> OLMResult<Self> { pub fn open(dir: &Path) -> OLMResult<Self> {
let mphf: Mphf = Mphf::load_full(&dir.join(MPHF_FILE)) let mphf: MemCase<MphfEps> = Mphf::mmap(&dir.join(MPHF_FILE), Flags::empty())
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?; .map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
Ok(Self(mphf)) Ok(Self(mphf))
} }
@@ -336,6 +342,7 @@ impl MphfLayer {
} }
} }
LayerMeta::save(dir, n)?;
Ok(n) Ok(n)
} }
} }
+14 -2
View File
@@ -153,11 +153,23 @@ pub fn read_fastq_chunks(
/// Returns an error if the format cannot be identified as `text/fasta` or `text/fastq`. /// Returns an error if the format cannot be identified as `text/fasta` or `text/fastq`.
pub fn read_sequence_chunks( pub fn read_sequence_chunks(
path: &str, path: &str,
) -> io::Result<SeqChunkIter<MimeTypeGuesser<Box<dyn Read + Send>>>> {
read_sequence_chunks_sized(path, DEFAULT_BLOCK_SIZE)
}
/// Same as [`read_sequence_chunks`] but with an explicit I/O block size.
///
/// Larger values amortise per-partition open/close overhead across more superkmers.
pub fn read_sequence_chunks_sized(
path: &str,
block_size: usize,
) -> io::Result<SeqChunkIter<MimeTypeGuesser<Box<dyn Read + Send>>>> { ) -> io::Result<SeqChunkIter<MimeTypeGuesser<Box<dyn Read + Send>>>> {
let input = match xopen(path) { let input = match xopen(path) {
Ok(mut i) => match i.mime_type() { Ok(mut i) => match i.mime_type() {
Some("text/fasta") => fasta_chunks(i), Some("text/fasta") => SeqChunkIter::new(i, block_size,
Some("text/fastq") => fastq_chunks(i), fasta::end_of_last_fasta_entry, Some("text/fasta")),
Some("text/fastq") => SeqChunkIter::new(i, block_size,
fastq::end_of_last_fastq_entry, Some("text/fastq")),
_ => { _ => {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::InvalidData, io::ErrorKind::InvalidData,
+1 -1
View File
@@ -18,7 +18,7 @@ pub mod xopen;
pub use chunk::{ pub use chunk::{
SeqChunkIter, fasta_chunks, fastq_chunks, read_fasta_chunks, read_fastq_chunks, SeqChunkIter, fasta_chunks, fastq_chunks, read_fasta_chunks, read_fastq_chunks,
read_sequence_chunks, read_sequence_chunks, read_sequence_chunks_sized,
}; };
pub use mimetype::MimeTypeGuesser; pub use mimetype::MimeTypeGuesser;
pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk}; pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk};
+1
View File
@@ -5,3 +5,4 @@ edition = "2024"
[dependencies] [dependencies]
libc = "0.2" libc = "0.2"
sysinfo = "0.33"
+15
View File
@@ -2,6 +2,21 @@ use std::fmt;
use std::time::Instant; use std::time::Instant;
use libc::{RUSAGE_SELF, getrusage, rusage, timeval}; use libc::{RUSAGE_SELF, getrusage, rusage, timeval};
use sysinfo::System;
// ── Memory query ──────────────────────────────────────────────────────────────
/// Returns the number of bytes available for allocation on this machine.
///
/// On macOS, `available_memory()` can return 0 when the memory compressor
/// inflates the page count; in that case we fall back to half of total memory.
pub fn available_memory_bytes() -> u64 {
let sys = System::new_all();
match sys.available_memory() {
0 => sys.total_memory() / 2,
n => n,
}
}
// ── raw helpers ─────────────────────────────────────────────────────────────── // ── raw helpers ───────────────────────────────────────────────────────────────