From 4736a7b6de7c6ffd5c03633edef1ba855695b19a Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sun, 17 May 2026 15:34:44 +0800 Subject: [PATCH] refactor: restructure k-mer partitioning pipeline for memory efficiency Replace in-memory hashing with a disk-backed external merge sort and `PersistentCompactIntVec` to drastically reduce peak RAM. Unify both phases using a custom `PtrHash` MPHF, eliminating `GOFunction` and `boomphf`. Introduce a concrete three-step `count_partition()` pipeline with adaptive chunk sizing based on available system memory. Update dependencies to `memmap2`, `ptr_hash`, and `obicompactvec`. Additionally, document strict genomics-only memory constraints and enforce an architectural feedback workflow requiring explicit user authorization before structural changes. --- docmd/implementation/mphf.md | 23 ++-- docmd/implementation/pipeline.md | 8 +- memory/MEMORY.md | 4 + memory/feedback_architectural_decisions.md | 17 +++ src/Cargo.lock | 12 +- src/Cargo.toml | 2 +- src/obikpartitionner/Cargo.toml | 5 +- src/obikpartitionner/src/kmer_sort.rs | 126 ++++++++++++++++++ src/obikpartitionner/src/lib.rs | 1 + src/obikpartitionner/src/partition.rs | 146 +++++++-------------- 10 files changed, 230 insertions(+), 114 deletions(-) create mode 100644 memory/MEMORY.md create mode 100644 memory/feedback_architectural_decisions.md create mode 100644 src/obikpartitionner/src/kmer_sort.rs diff --git a/docmd/implementation/mphf.md b/docmd/implementation/mphf.md index 3c852b0..f1a7242 100644 --- a/docmd/implementation/mphf.md +++ b/docmd/implementation/mphf.md @@ -6,20 +6,20 @@ Kmer indexing per partition proceeds in two phases. The separation is necessary ### Phase 1 — provisional MPHF + kmer spectrum -Implemented in `obikpartitionner::KmerPartition::count_kmer()`. +Implemented in `obikpartitionner::KmerPartition::count_kmer()` → `count_partition()`. -1. **Pass 1**: read the dereplicated superkmer file; enumerate all unique canonical kmers into a `HashSet`. Exact count known after this pass. -2. **Build a provisional MPHF** (`GOFunction` from the `ph` crate) over the exact kmer set. Produces `mphf1.bin`. -3. **Create `counts1.bin`**: one zero-initialised `u32` per MPHF slot (mmap'd). -4. **Pass 2**: re-read the dereplicated file; for each kmer, query `mphf1.get(kmer)` and atomically accumulate the superkmer count into `counts1[slot]`. +1. **External sort**: read the dereplicated superkmer file; extract the raw `u64` canonical kmer value for every kmer of every superkmer. Sort in RAM-bounded chunks (adaptive budget: 40% of available RAM ÷ n_threads, minimum 1 M kmers per chunk), then k-way merge with inline dedup. Result: `sorted_unique.bin` — a flat array of f0 distinct sorted `u64` values. Exact kmer count f0 is known at this point. +2. **Build provisional MPHF** (ptr_hash, same configuration as phase 2) over `sorted_unique.bin` using `new_from_par_iter`. Delete `sorted_unique.bin` immediately after. Persist to `mphf1.bin`. +3. **Create `counts1.bin`**: `PersistentCompactIntVec` with f0 slots, zero-initialised. +4. **Accumulation pass**: re-read the dereplicated superkmer file; for each kmer in each superkmer, compute `slot = mphf.index(kmer.raw())` and increment `counts1[slot]` by the superkmer's COUNT. 5. **Build kmer frequency spectrum** from `counts1`: histogram `{count → n_kmers}`, totals f0 (distinct kmers) and f1 (total abundance). Written to `kmer_spectrum_raw.json` per partition, then merged globally. Files produced per partition: ``` part_XXXXX/ - mphf1.bin — GOFunction (provisional MPHF, discarded after phase 2) - counts1.bin — [u32; n_kmers] kmer counts, mmap'd + mphf1.bin — ptr_hash provisional MPHF (discarded after phase 2) + counts1.bin — PersistentCompactIntVec, f0 × u32 kmer counts kmer_spectrum_raw.json — local frequency spectrum ``` @@ -53,16 +53,13 @@ After filtering (applying a min-count threshold derived from the spectrum) and b **FMPH/FMPHGO** (`ph` crate, Beling, ACM JEA 2023): - ~2.1 bits/key — most compact; good query speed; deterministic construction -- Works well from an exact or slightly overestimated count -- `GOFunction` (group-oriented variant) is the specific type used +- `GOFunction` (group-oriented variant) was the original phase-1 choice; eliminated when the external sort made the exact count available at phase 1 as well ## MPHF choice per phase -**Phase 1** (provisional, discarded after spectrum computation): `ph::fmph::GOFunction`. Compact, fast to build from the exact post-dedup kmer set. Query speed is secondary — the structure is only used during pass 2 of `count_kmer`. +**Both phases**: **ptr_hash**, same type alias and construction parameters. The external sort (phase 1) and the unitig index (phase 2) both provide the exact key count before MPHF construction, so ptr_hash's requirement is satisfied in both cases. Using a single MPHF implementation removes the `ph` crate dependency. -**Phase 2** (persistent, queried repeatedly): **ptr_hash**. Exact key count is available from the unitig index; ptr_hash query speed (≥2.1×) and construction speed (≥3.1× over FMPH) are the decisive factors. The 2.4 bits/key overhead is acceptable. - -boomphf is eliminated: largest space overhead, streaming advantage does not apply. +boomphf: eliminated — largest space overhead, streaming advantage no longer needed. FMPH/GOFunction: eliminated — exact count available, ptr_hash is faster at equivalent compactness. --- diff --git a/docmd/implementation/pipeline.md b/docmd/implementation/pipeline.md index 0c49f88..af9af7f 100644 --- a/docmd/implementation/pipeline.md +++ b/docmd/implementation/pipeline.md @@ -82,7 +82,13 @@ for each super-kmer (sequence, COUNT): kmer_counts[canonical(kmer)] += COUNT ``` -Implemented as an external sort or a temporary HashMap, depending on partition size. At the end of this phase, each distinct canonical kmer has its exact total count. +Implemented as a three-step pipeline in `count_partition()`: + +1. **External sort** (`kmer_sort::sort_unique_kmers`): read dereplicated superkmers, extract canonical kmer raw `u64` values, sort in RAM-bounded chunks (adaptive: 40% of available RAM ÷ n_threads, min 1 M kmers/chunk), k-way merge with inline dedup → `sorted_unique.bin`. f0 is now known exactly. +2. **Provisional MPHF** (ptr_hash): built from `sorted_unique.bin` via `new_from_par_iter(f0, ...)`. Stored to `mphf1.bin`; `sorted_unique.bin` deleted immediately. +3. **Accumulation pass**: re-read dereplicated superkmers; for each kmer, `slot = mphf.index(kmer.raw())`, increment `counts1[slot]` by the superkmer COUNT. Stored in a `PersistentCompactIntVec` (`counts1.bin`). + +At the end of this phase, each distinct canonical kmer has its exact total count, and the frequency spectrum (`kmer_spectrum_raw.json`) is written. Abundance filter applied here: kmers with `total_count < q` are discarded. `q` is a collection parameter (0 = keep all, including singletons for ≤1x data). diff --git a/memory/MEMORY.md b/memory/MEMORY.md new file mode 100644 index 0000000..1e068c8 --- /dev/null +++ b/memory/MEMORY.md @@ -0,0 +1,4 @@ +# Memory Index + +- [Project domain](project_domain.md) — obikmer est pour la génomique (génomes individuels), pas la métagénomique +- [No architectural decisions without authorization](feedback_architectural_decisions.md) — toute décision architecturale (mémoire, algo, structure) requiert l'accord explicite de l'utilisateur avant toute action diff --git a/memory/feedback_architectural_decisions.md b/memory/feedback_architectural_decisions.md new file mode 100644 index 0000000..02c80fe --- /dev/null +++ b/memory/feedback_architectural_decisions.md @@ -0,0 +1,17 @@ +--- +name: No architectural decisions without explicit authorization +description: Never make architectural or design decisions without explicit user approval — code decisions are the user's alone +type: feedback +--- + +Never make architectural decisions unilaterally. This includes: +- Memory layout or footprint changes +- Algorithm or data structure choices (HashSet vs streaming, etc.) +- Dependency additions or substitutions +- Structural refactors that go beyond the exact task requested + +If a bug or inefficiency is observed, **report it and propose alternatives** — do not fix it without explicit authorization. + +**Why:** The user optimizes for minimal memory footprint at all times. Introducing a HashSet in `count_kmer()` (replacing the intended streaming GOFunction construction from the sidecar estimate) caused a serious memory regression that went unreported. This is inadmissible on a project where memory efficiency is a core constraint. + +**How to apply:** When editing code and noticing an architectural issue (even a clear improvement), stop, describe the problem and options, and wait for explicit go-ahead before touching anything. diff --git a/src/Cargo.lock b/src/Cargo.lock index b1de6b4..904168b 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1745,14 +1745,17 @@ dependencies = [ name = "obikpartitionner" version = "0.1.0" dependencies = [ + "cacheline-ef", + "epserde 0.8.0", "memmap2", "niffler 3.0.0", + "obicompactvec", "obikrope", "obikseq", "obiread", "obiskbuilder", "obiskio", - "ph", + "ptr_hash", "rayon", "remove_dir_all", "serde", @@ -1842,6 +1845,13 @@ dependencies = [ "tempfile", ] +[[package]] +name = "obisys" +version = "0.1.0" +dependencies = [ + "libc", +] + [[package]] name = "objc2-core-foundation" version = "0.3.2" diff --git a/src/Cargo.toml b/src/Cargo.toml index 9f3fc56..5ffe180 100644 --- a/src/Cargo.toml +++ b/src/Cargo.toml @@ -1,5 +1,5 @@ [workspace] resolver = "3" -members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio","obidebruinj","obilayeredmap", "obicompactvec"] +members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio","obidebruinj","obilayeredmap", "obicompactvec", "obisys"] [profile.release] debug = 1 diff --git a/src/obikpartitionner/Cargo.toml b/src/obikpartitionner/Cargo.toml index 256cabc..853e630 100644 --- a/src/obikpartitionner/Cargo.toml +++ b/src/obikpartitionner/Cargo.toml @@ -20,5 +20,8 @@ sysinfo = "0.33" serde = { version = "1", features = ["derive"] } serde_json = "1" tracing = "0.1.44" -ph = "0.11" +cacheline-ef = "1.1" +epserde = "0.8" memmap2 = "0.9.10" +obicompactvec = { path = "../obicompactvec" } +ptr_hash = "1.1" diff --git a/src/obikpartitionner/src/kmer_sort.rs b/src/obikpartitionner/src/kmer_sort.rs new file mode 100644 index 0000000..1bf8fc6 --- /dev/null +++ b/src/obikpartitionner/src/kmer_sort.rs @@ -0,0 +1,126 @@ +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::fs; +use std::io::{self, BufWriter, Write}; +use std::path::{Path, PathBuf}; + +use memmap2::Mmap; +use obiskio::{SKFileReader, SKResult}; + +/// Extract all canonical kmers from a dereplicated superkmer file, +/// sort them with an external merge sort, deduplicate, and write +/// unique u64 values to `unique_path`. Returns f0 (distinct kmers). +pub fn sort_unique_kmers( + dedup_path: &Path, + work_dir: &Path, + unique_path: &Path, + chunk_kmers: usize, +) -> SKResult { + let chunk_paths = extract_and_sort_chunks(dedup_path, work_dir, chunk_kmers)?; + + if chunk_paths.is_empty() { + fs::write(unique_path, [])?; + return Ok(0); + } + + let f0 = merge_and_dedup(&chunk_paths, unique_path)?; + for p in &chunk_paths { + fs::remove_file(p)?; + } + Ok(f0) +} + +/// Number of kmers per sort chunk from available RAM (per thread). +pub fn chunk_size_from_ram(available_bytes: u64) -> usize { + // 40% of available RAM; each kmer is 8 bytes (u64) + let n = ((available_bytes as f64 * 0.40) / 8.0) as usize; + n.max(1 << 20) // minimum 1M kmers (~8 MB) +} + +// ── private ─────────────────────────────────────────────────────────────────── + +fn extract_and_sort_chunks( + dedup_path: &Path, + work_dir: &Path, + chunk_kmers: usize, +) -> SKResult> { + let mut reader = SKFileReader::open(dedup_path)?; + let mut buf: Vec = Vec::with_capacity(chunk_kmers); + let mut paths: Vec = Vec::new(); + + while let Some(sk) = reader.read()? { + for kmer in sk.iter_canonical_kmers() { + buf.push(kmer.raw()); + if buf.len() >= chunk_kmers { + paths.push(flush_sorted_chunk(&mut buf, work_dir, paths.len())?); + } + } + } + if !buf.is_empty() { + paths.push(flush_sorted_chunk(&mut buf, work_dir, paths.len())?); + } + Ok(paths) +} + +fn flush_sorted_chunk(buf: &mut Vec, work_dir: &Path, idx: usize) -> io::Result { + buf.sort_unstable(); + let path = work_dir.join(format!("kmer_sort_{idx:05}.bin")); + let mut w = BufWriter::new(fs::File::create(&path)?); + for &v in buf.iter() { + w.write_all(&v.to_le_bytes())?; + } + buf.clear(); + Ok(path) +} + +/// K-way merge of sorted chunk files + in-line dedup → `unique_path`. +fn merge_and_dedup(chunk_paths: &[PathBuf], unique_path: &Path) -> io::Result { + struct ChunkReader { + mmap: Mmap, + pos: usize, + } + impl ChunkReader { + fn open(path: &Path) -> io::Result { + let f = fs::File::open(path)?; + Ok(Self { mmap: unsafe { Mmap::map(&f)? }, pos: 0 }) + } + fn peek(&self) -> Option { + let off = self.pos * 8; + if off + 8 <= self.mmap.len() { + Some(u64::from_le_bytes(self.mmap[off..off + 8].try_into().unwrap())) + } else { + None + } + } + fn advance(&mut self) { self.pos += 1; } + } + + let mut readers: Vec = chunk_paths.iter() + .map(|p| ChunkReader::open(p)) + .collect::>()?; + + let mut heap: BinaryHeap> = BinaryHeap::new(); + for (i, r) in readers.iter().enumerate() { + if let Some(v) = r.peek() { + heap.push(Reverse((v, i))); + } + } + + let mut w = BufWriter::new(fs::File::create(unique_path)?); + let mut f0 = 0usize; + let mut prev: Option = None; + + while let Some(Reverse((val, idx))) = heap.pop() { + if prev != Some(val) { + w.write_all(&val.to_le_bytes())?; + prev = Some(val); + f0 += 1; + } + readers[idx].advance(); + if let Some(next_val) = readers[idx].peek() { + heap.push(Reverse((next_val, idx))); + } + } + + Ok(f0) +} diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index 42e078b..a1e0b9f 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -1,3 +1,4 @@ +mod kmer_sort; mod partition; pub use partition::KmerPartition; diff --git a/src/obikpartitionner/src/partition.rs b/src/obikpartitionner/src/partition.rs index a55bbd9..cf6370f 100644 --- a/src/obikpartitionner/src/partition.rs +++ b/src/obikpartitionner/src/partition.rs @@ -1,25 +1,29 @@ -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap}; use std::fs; use std::io; use std::path::{Path, PathBuf}; use tracing::{debug, info}; -use memmap2::MmapMut; -use obikseq::kmer::CanonicalKmer; -use ph::fmph::GOFunction; - -use sysinfo::System; - -use remove_dir_all::remove_dir_all; - -use niffler::Level; -use niffler::send::compression::Format; +use cacheline_ef::{CachelineEf, CachelineEfVec}; +use epserde::ser::Serialize as EpSerialize; +use memmap2::Mmap; +use obicompactvec::PersistentCompactIntVecBuilder; use obikseq::RoutableSuperKmer; use obikseq::Sequence; use obikseq::superkmer::SuperKmer; use obiskio::{SKFileMeta, SKFileReader, SKFileWriter, SKResult}; +use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64}; use rayon::prelude::*; +use remove_dir_all::remove_dir_all; use serde::{Deserialize, Serialize}; +use sysinfo::System; + +use niffler::Level; +use niffler::send::compression::Format; + +use crate::kmer_sort::{chunk_size_from_ram, sort_unique_kmers}; + +type Mphf = PtrHash>, Xx64, Vec>; const META_FILENAME: &str = "partition.meta"; const SK_EXT: &str = "skmer.zst"; @@ -243,7 +247,14 @@ impl KmerPartition { /// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously. pub fn count_kmer(&self) -> SKResult<()> { let root = &self.root_path; - let k = self.kmer_size; + + let sys = System::new_all(); + let available = match sys.available_memory() { + 0 => sys.total_memory() / 2, + n => n, + }; + let n_threads = rayon::current_num_threads().max(1) as u64; + let chunk_kmers = chunk_size_from_ram(available / n_threads); let results: Vec> = (0..self.n_partitions) .into_par_iter() @@ -254,7 +265,7 @@ impl KmerPartition { return Ok(()); } info!("counting kmers in partition {}/{}", i, self.n_partitions); - count_partition(&dir, &dedup_path, k) + count_partition(&dir, &dedup_path, chunk_kmers) }) .collect(); for r in results { @@ -492,107 +503,49 @@ fn flush_map(map: HashMap, writer: &mut SKFileWriter) -> SKResul Ok(()) } -/// Build the provisional MPHF and count file for one partition directory. -fn count_partition(dir: &Path, dedup_path: &Path, k: usize) -> SKResult<()> { - // Estimate number of kmers from sidecar to pre-allocate the HashSet. - let capacity = SKFileMeta::read(dedup_path) - .ok() - .flatten() - .map(|m| { - let km1 = (k as u64).saturating_sub(1); - m.length_sum.saturating_sub(m.instances.saturating_mul(km1)) as usize - }) - .unwrap_or(0); - debug!("{}: sidecar capacity estimate={capacity}", dir.display()); +fn build_mphf(unique_path: &Path, f0: usize) -> io::Result { + let file = fs::File::open(unique_path)?; + let mmap = unsafe { Mmap::map(&file)? }; + let kmers: &[u64] = unsafe { + std::slice::from_raw_parts(mmap.as_ptr() as *const u64, f0) + }; + Ok(Mphf::new_from_par_iter(f0, kmers.par_iter().copied(), PtrHashParams::::default())) +} - // Pass 1: collect all unique canonical kmers. - let mut seen: HashSet = HashSet::with_capacity(capacity); - let mut pass1_superkmers: u64 = 0; - { - let mut reader = SKFileReader::open(dedup_path)?; - while let Some(sk) = reader.read()? { - pass1_superkmers += 1; - for kmer in sk.iter_canonical_kmers() { - seen.insert(kmer); - } - } - } - let kmers: Vec = seen.into_iter().collect(); - let n_kmers = kmers.len(); - debug!( - "{}: pass1 superkmers={pass1_superkmers} unique_kmers={n_kmers}", - dir.display() - ); - - if n_kmers == 0 { +fn count_partition(dir: &Path, dedup_path: &Path, chunk_kmers: usize) -> SKResult<()> { + let unique_path = dir.join("sorted_unique.bin"); + let f0 = sort_unique_kmers(dedup_path, dir, &unique_path, chunk_kmers)?; + if f0 == 0 { return Ok(()); } + debug!("{}: f0={f0} unique kmers sorted", dir.display()); - // Build provisional MPHF. - let mphf = GOFunction::from(kmers); - debug!("{}: MPHF built len={}", dir.display(), mphf.len()); + let mphf = build_mphf(&unique_path, f0)?; + fs::remove_file(&unique_path)?; - // Create memory-mapped count file (u32 per slot, zero-initialised). let counts_path = dir.join("counts1.bin"); - let counts_file = fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(&counts_path)?; - counts_file.set_len((n_kmers * std::mem::size_of::()) as u64)?; - let mut mmap = unsafe { MmapMut::map_mut(&counts_file)? }; - mmap.fill(0); + let mut builder = PersistentCompactIntVecBuilder::new(f0, &counts_path)?; - // Pass 2: accumulate superkmer counts into the mmap'd array. - let mut pass2_superkmers: u64 = 0; - let mut pass2_kmer_hits: u64 = 0; - let mut pass2_kmer_misses: u64 = 0; - let mut pass2_count_sum: u64 = 0; { - let counts = - unsafe { std::slice::from_raw_parts_mut(mmap.as_mut_ptr() as *mut u32, n_kmers) }; let mut reader = SKFileReader::open(dedup_path)?; while let Some(sk) = reader.read()? { - pass2_superkmers += 1; - let seql = sk.seql(); let sk_count = sk.count(); - if pass2_superkmers <= 3 { - debug!( - "{}: sk#{pass2_superkmers} seql={seql} count={sk_count}", - dir.display() - ); - } - if seql < k { - continue; - } - pass2_count_sum += sk_count as u64; for kmer in sk.iter_canonical_kmers() { - if let Some(idx) = mphf.get(&kmer) { - counts[idx as usize] = counts[idx as usize].saturating_add(sk_count); - pass2_kmer_hits += 1; - } else { - pass2_kmer_misses += 1; - } + let slot = mphf.index(&kmer.raw()); + builder.set(slot, builder.get(slot).saturating_add(sk_count)); } } } - debug!( - "{}: pass2 superkmers={pass2_superkmers} hits={pass2_kmer_hits} misses={pass2_kmer_misses} count_sum={pass2_count_sum}", - dir.display() - ); - mmap.flush()?; - // Build kmer frequency spectrum from the count array. - let counts = unsafe { std::slice::from_raw_parts(mmap.as_ptr() as *const u32, n_kmers) }; let mut spectrum: BTreeMap = BTreeMap::new(); - for &c in counts { + for slot in 0..f0 { + let c = builder.get(slot); if c > 0 { *spectrum.entry(c).or_insert(0) += 1; } } - let f0 = n_kmers as u64; let f1: u64 = spectrum.iter().map(|(&c, &f)| c as u64 * f).sum(); + builder.close()?; let spectrum_map: BTreeMap = spectrum .iter() @@ -600,13 +553,12 @@ fn count_partition(dir: &Path, dedup_path: &Path, k: usize) -> SKResult<()> { .collect(); serde_json::to_writer_pretty( fs::File::create(dir.join("kmer_spectrum_raw.json"))?, - &serde_json::json!({ "f0": f0, "f1": f1, "spectrum": &spectrum_map }), + &serde_json::json!({ "f0": f0 as u64, "f1": f1, "spectrum": &spectrum_map }), ) .map_err(io::Error::other)?; - // Persist MPHF to disk. - let mphf_path = dir.join("mphf1.bin"); - mphf.write(&mut fs::File::create(&mphf_path)?)?; + EpSerialize::store(&mphf, &dir.join("mphf1.bin")) + .map_err(|e| io::Error::other(e.to_string()))?; Ok(()) } @@ -676,7 +628,7 @@ mod tests { if !dedup_path.exists() { return (0, 0); } - count_partition(&part_dir, &dedup_path, K).unwrap(); + count_partition(&part_dir, &dedup_path, 1 << 20).unwrap(); let spec: serde_json::Value = serde_json::from_reader( fs::File::open(part_dir.join("kmer_spectrum_raw.json")).unwrap(),