refactor: restructure k-mer partitioning pipeline for memory efficiency

Replace in-memory hashing with a disk-backed external merge sort and `PersistentCompactIntVec` to drastically reduce peak RAM. Unify both phases using a custom `PtrHash` MPHF, eliminating `GOFunction` and `boomphf`. Introduce a concrete three-step `count_partition()` pipeline with adaptive chunk sizing based on available system memory. Update dependencies to `memmap2`, `ptr_hash`, and `obicompactvec`. Additionally, document strict genomics-only memory constraints and enforce an architectural feedback workflow requiring explicit user authorization before structural changes.
This commit is contained in:
Eric Coissac
2026-05-17 15:34:44 +08:00
parent f36b095ce2
commit 4736a7b6de
10 changed files with 230 additions and 114 deletions
+4 -1
View File
@@ -20,5 +20,8 @@ sysinfo = "0.33"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tracing = "0.1.44"
ph = "0.11"
cacheline-ef = "1.1"
epserde = "0.8"
memmap2 = "0.9.10"
obicompactvec = { path = "../obicompactvec" }
ptr_hash = "1.1"
+126
View File
@@ -0,0 +1,126 @@
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::fs;
use std::io::{self, BufWriter, Write};
use std::path::{Path, PathBuf};
use memmap2::Mmap;
use obiskio::{SKFileReader, SKResult};
/// Extract all canonical kmers from a dereplicated superkmer file,
/// sort them with an external merge sort, deduplicate, and write
/// unique u64 values to `unique_path`. Returns f0 (distinct kmers).
pub fn sort_unique_kmers(
dedup_path: &Path,
work_dir: &Path,
unique_path: &Path,
chunk_kmers: usize,
) -> SKResult<usize> {
let chunk_paths = extract_and_sort_chunks(dedup_path, work_dir, chunk_kmers)?;
if chunk_paths.is_empty() {
fs::write(unique_path, [])?;
return Ok(0);
}
let f0 = merge_and_dedup(&chunk_paths, unique_path)?;
for p in &chunk_paths {
fs::remove_file(p)?;
}
Ok(f0)
}
/// Number of kmers per sort chunk from available RAM (per thread).
pub fn chunk_size_from_ram(available_bytes: u64) -> usize {
// 40% of available RAM; each kmer is 8 bytes (u64)
let n = ((available_bytes as f64 * 0.40) / 8.0) as usize;
n.max(1 << 20) // minimum 1M kmers (~8 MB)
}
// ── private ───────────────────────────────────────────────────────────────────
fn extract_and_sort_chunks(
dedup_path: &Path,
work_dir: &Path,
chunk_kmers: usize,
) -> SKResult<Vec<PathBuf>> {
let mut reader = SKFileReader::open(dedup_path)?;
let mut buf: Vec<u64> = Vec::with_capacity(chunk_kmers);
let mut paths: Vec<PathBuf> = Vec::new();
while let Some(sk) = reader.read()? {
for kmer in sk.iter_canonical_kmers() {
buf.push(kmer.raw());
if buf.len() >= chunk_kmers {
paths.push(flush_sorted_chunk(&mut buf, work_dir, paths.len())?);
}
}
}
if !buf.is_empty() {
paths.push(flush_sorted_chunk(&mut buf, work_dir, paths.len())?);
}
Ok(paths)
}
fn flush_sorted_chunk(buf: &mut Vec<u64>, work_dir: &Path, idx: usize) -> io::Result<PathBuf> {
buf.sort_unstable();
let path = work_dir.join(format!("kmer_sort_{idx:05}.bin"));
let mut w = BufWriter::new(fs::File::create(&path)?);
for &v in buf.iter() {
w.write_all(&v.to_le_bytes())?;
}
buf.clear();
Ok(path)
}
/// K-way merge of sorted chunk files + in-line dedup → `unique_path`.
fn merge_and_dedup(chunk_paths: &[PathBuf], unique_path: &Path) -> io::Result<usize> {
struct ChunkReader {
mmap: Mmap,
pos: usize,
}
impl ChunkReader {
fn open(path: &Path) -> io::Result<Self> {
let f = fs::File::open(path)?;
Ok(Self { mmap: unsafe { Mmap::map(&f)? }, pos: 0 })
}
fn peek(&self) -> Option<u64> {
let off = self.pos * 8;
if off + 8 <= self.mmap.len() {
Some(u64::from_le_bytes(self.mmap[off..off + 8].try_into().unwrap()))
} else {
None
}
}
fn advance(&mut self) { self.pos += 1; }
}
let mut readers: Vec<ChunkReader> = chunk_paths.iter()
.map(|p| ChunkReader::open(p))
.collect::<io::Result<_>>()?;
let mut heap: BinaryHeap<Reverse<(u64, usize)>> = BinaryHeap::new();
for (i, r) in readers.iter().enumerate() {
if let Some(v) = r.peek() {
heap.push(Reverse((v, i)));
}
}
let mut w = BufWriter::new(fs::File::create(unique_path)?);
let mut f0 = 0usize;
let mut prev: Option<u64> = None;
while let Some(Reverse((val, idx))) = heap.pop() {
if prev != Some(val) {
w.write_all(&val.to_le_bytes())?;
prev = Some(val);
f0 += 1;
}
readers[idx].advance();
if let Some(next_val) = readers[idx].peek() {
heap.push(Reverse((next_val, idx)));
}
}
Ok(f0)
}
+1
View File
@@ -1,3 +1,4 @@
mod kmer_sort;
mod partition;
pub use partition::KmerPartition;
+49 -97
View File
@@ -1,25 +1,29 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap};
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use tracing::{debug, info};
use memmap2::MmapMut;
use obikseq::kmer::CanonicalKmer;
use ph::fmph::GOFunction;
use sysinfo::System;
use remove_dir_all::remove_dir_all;
use niffler::Level;
use niffler::send::compression::Format;
use cacheline_ef::{CachelineEf, CachelineEfVec};
use epserde::ser::Serialize as EpSerialize;
use memmap2::Mmap;
use obicompactvec::PersistentCompactIntVecBuilder;
use obikseq::RoutableSuperKmer;
use obikseq::Sequence;
use obikseq::superkmer::SuperKmer;
use obiskio::{SKFileMeta, SKFileReader, SKFileWriter, SKResult};
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
use rayon::prelude::*;
use remove_dir_all::remove_dir_all;
use serde::{Deserialize, Serialize};
use sysinfo::System;
use niffler::Level;
use niffler::send::compression::Format;
use crate::kmer_sort::{chunk_size_from_ram, sort_unique_kmers};
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
const META_FILENAME: &str = "partition.meta";
const SK_EXT: &str = "skmer.zst";
@@ -243,7 +247,14 @@ impl KmerPartition {
/// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously.
pub fn count_kmer(&self) -> SKResult<()> {
let root = &self.root_path;
let k = self.kmer_size;
let sys = System::new_all();
let available = match sys.available_memory() {
0 => sys.total_memory() / 2,
n => n,
};
let n_threads = rayon::current_num_threads().max(1) as u64;
let chunk_kmers = chunk_size_from_ram(available / n_threads);
let results: Vec<SKResult<()>> = (0..self.n_partitions)
.into_par_iter()
@@ -254,7 +265,7 @@ impl KmerPartition {
return Ok(());
}
info!("counting kmers in partition {}/{}", i, self.n_partitions);
count_partition(&dir, &dedup_path, k)
count_partition(&dir, &dedup_path, chunk_kmers)
})
.collect();
for r in results {
@@ -492,107 +503,49 @@ fn flush_map(map: HashMap<SuperKmer, u64>, writer: &mut SKFileWriter) -> SKResul
Ok(())
}
/// Build the provisional MPHF and count file for one partition directory.
fn count_partition(dir: &Path, dedup_path: &Path, k: usize) -> SKResult<()> {
// Estimate number of kmers from sidecar to pre-allocate the HashSet.
let capacity = SKFileMeta::read(dedup_path)
.ok()
.flatten()
.map(|m| {
let km1 = (k as u64).saturating_sub(1);
m.length_sum.saturating_sub(m.instances.saturating_mul(km1)) as usize
})
.unwrap_or(0);
debug!("{}: sidecar capacity estimate={capacity}", dir.display());
fn build_mphf(unique_path: &Path, f0: usize) -> io::Result<Mphf> {
let file = fs::File::open(unique_path)?;
let mmap = unsafe { Mmap::map(&file)? };
let kmers: &[u64] = unsafe {
std::slice::from_raw_parts(mmap.as_ptr() as *const u64, f0)
};
Ok(Mphf::new_from_par_iter(f0, kmers.par_iter().copied(), PtrHashParams::<CubicEps>::default()))
}
// Pass 1: collect all unique canonical kmers.
let mut seen: HashSet<CanonicalKmer> = HashSet::with_capacity(capacity);
let mut pass1_superkmers: u64 = 0;
{
let mut reader = SKFileReader::open(dedup_path)?;
while let Some(sk) = reader.read()? {
pass1_superkmers += 1;
for kmer in sk.iter_canonical_kmers() {
seen.insert(kmer);
}
}
}
let kmers: Vec<CanonicalKmer> = seen.into_iter().collect();
let n_kmers = kmers.len();
debug!(
"{}: pass1 superkmers={pass1_superkmers} unique_kmers={n_kmers}",
dir.display()
);
if n_kmers == 0 {
fn count_partition(dir: &Path, dedup_path: &Path, chunk_kmers: usize) -> SKResult<()> {
let unique_path = dir.join("sorted_unique.bin");
let f0 = sort_unique_kmers(dedup_path, dir, &unique_path, chunk_kmers)?;
if f0 == 0 {
return Ok(());
}
debug!("{}: f0={f0} unique kmers sorted", dir.display());
// Build provisional MPHF.
let mphf = GOFunction::from(kmers);
debug!("{}: MPHF built len={}", dir.display(), mphf.len());
let mphf = build_mphf(&unique_path, f0)?;
fs::remove_file(&unique_path)?;
// Create memory-mapped count file (u32 per slot, zero-initialised).
let counts_path = dir.join("counts1.bin");
let counts_file = fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&counts_path)?;
counts_file.set_len((n_kmers * std::mem::size_of::<u32>()) as u64)?;
let mut mmap = unsafe { MmapMut::map_mut(&counts_file)? };
mmap.fill(0);
let mut builder = PersistentCompactIntVecBuilder::new(f0, &counts_path)?;
// Pass 2: accumulate superkmer counts into the mmap'd array.
let mut pass2_superkmers: u64 = 0;
let mut pass2_kmer_hits: u64 = 0;
let mut pass2_kmer_misses: u64 = 0;
let mut pass2_count_sum: u64 = 0;
{
let counts =
unsafe { std::slice::from_raw_parts_mut(mmap.as_mut_ptr() as *mut u32, n_kmers) };
let mut reader = SKFileReader::open(dedup_path)?;
while let Some(sk) = reader.read()? {
pass2_superkmers += 1;
let seql = sk.seql();
let sk_count = sk.count();
if pass2_superkmers <= 3 {
debug!(
"{}: sk#{pass2_superkmers} seql={seql} count={sk_count}",
dir.display()
);
}
if seql < k {
continue;
}
pass2_count_sum += sk_count as u64;
for kmer in sk.iter_canonical_kmers() {
if let Some(idx) = mphf.get(&kmer) {
counts[idx as usize] = counts[idx as usize].saturating_add(sk_count);
pass2_kmer_hits += 1;
} else {
pass2_kmer_misses += 1;
}
let slot = mphf.index(&kmer.raw());
builder.set(slot, builder.get(slot).saturating_add(sk_count));
}
}
}
debug!(
"{}: pass2 superkmers={pass2_superkmers} hits={pass2_kmer_hits} misses={pass2_kmer_misses} count_sum={pass2_count_sum}",
dir.display()
);
mmap.flush()?;
// Build kmer frequency spectrum from the count array.
let counts = unsafe { std::slice::from_raw_parts(mmap.as_ptr() as *const u32, n_kmers) };
let mut spectrum: BTreeMap<u32, u64> = BTreeMap::new();
for &c in counts {
for slot in 0..f0 {
let c = builder.get(slot);
if c > 0 {
*spectrum.entry(c).or_insert(0) += 1;
}
}
let f0 = n_kmers as u64;
let f1: u64 = spectrum.iter().map(|(&c, &f)| c as u64 * f).sum();
builder.close()?;
let spectrum_map: BTreeMap<String, u64> = spectrum
.iter()
@@ -600,13 +553,12 @@ fn count_partition(dir: &Path, dedup_path: &Path, k: usize) -> SKResult<()> {
.collect();
serde_json::to_writer_pretty(
fs::File::create(dir.join("kmer_spectrum_raw.json"))?,
&serde_json::json!({ "f0": f0, "f1": f1, "spectrum": &spectrum_map }),
&serde_json::json!({ "f0": f0 as u64, "f1": f1, "spectrum": &spectrum_map }),
)
.map_err(io::Error::other)?;
// Persist MPHF to disk.
let mphf_path = dir.join("mphf1.bin");
mphf.write(&mut fs::File::create(&mphf_path)?)?;
EpSerialize::store(&mphf, &dir.join("mphf1.bin"))
.map_err(|e| io::Error::other(e.to_string()))?;
Ok(())
}
@@ -676,7 +628,7 @@ mod tests {
if !dedup_path.exists() {
return (0, 0);
}
count_partition(&part_dir, &dedup_path, K).unwrap();
count_partition(&part_dir, &dedup_path, 1 << 20).unwrap();
let spec: serde_json::Value = serde_json::from_reader(
fs::File::open(part_dir.join("kmer_spectrum_raw.json")).unwrap(),