diff --git a/src/obikindex/src/index.rs b/src/obikindex/src/index.rs index 4e6bd7c..c5ba267 100644 --- a/src/obikindex/src/index.rs +++ b/src/obikindex/src/index.rs @@ -1,17 +1,18 @@ +use std::collections::BTreeMap; use std::fs; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use indicatif::{ProgressBar, ProgressStyle}; -use obikpartitionner::KmerPartition; +use obikpartitionner::{KmerPartition, KmerSpectrum}; use obisys::{Reporter, Stage}; use rayon::prelude::*; use tracing::info; use crate::error::{OKIError, OKIResult}; use crate::meta::{IndexConfig, IndexMeta}; -use crate::state::{IndexState, SENTINEL_INDEXED, SENTINEL_SCATTERED}; +use crate::state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED}; pub struct KmerIndex { root_path: PathBuf, @@ -96,16 +97,37 @@ impl KmerIndex { /// Dereplicate all partitions then compute kmer counts. /// - /// Writes `kmer_spectrum_raw.json` at the index root upon completion - /// (this file doubles as the `Counted` sentinel). - pub fn dereplicate_and_count(&self, rep: &mut Reporter) -> OKIResult<()> { + /// Writes `spectrums/{label}.json` and touches `count.done` upon completion. + /// Per-partition spectrum files are removed unless `keep_intermediate` is true. + pub fn dereplicate_and_count(&self, keep_intermediate: bool, rep: &mut Reporter) -> OKIResult<()> { let t = Stage::start("dereplicate"); self.partition.dereplicate()?; rep.push(t.stop()); let t = Stage::start("count_kmer"); - self.partition.count_kmer()?; + let spectrum = self.partition.count_kmer(keep_intermediate)?; rep.push(t.stop()); + + self.write_spectrum(&spectrum)?; + touch(&self.root_path.join(SENTINEL_COUNTED))?; + Ok(()) + } + + fn write_spectrum(&self, sp: &KmerSpectrum) -> OKIResult<()> { + let label = self.meta.genomes.first().map(String::as_str).unwrap_or("unknown"); + let spectrums_dir = self.root_path.join("spectrums"); + fs::create_dir_all(&spectrums_dir)?; + let path = spectrums_dir.join(format!("{label}.json")); + let spectrum_map: BTreeMap = sp.counts + .iter() + .map(|(&c, &f)| (format!("{c:010}"), f)) + .collect(); + let f = fs::File::create(&path)?; + serde_json::to_writer_pretty( + f, + &serde_json::json!({ "f0": sp.f0, "f1": sp.f1, "spectrum": spectrum_map }), + ) + .map_err(OKIError::Json)?; Ok(()) } diff --git a/src/obikindex/src/state.rs b/src/obikindex/src/state.rs index a32d4d8..4db50ce 100644 --- a/src/obikindex/src/state.rs +++ b/src/obikindex/src/state.rs @@ -3,7 +3,7 @@ use std::path::Path; use crate::meta::META_FILENAME; pub const SENTINEL_SCATTERED: &str = "scatter.done"; -pub const SENTINEL_COUNTED: &str = "kmer_spectrum_raw.json"; +pub const SENTINEL_COUNTED: &str = "count.done"; pub const SENTINEL_INDEXED: &str = "index.done"; /// Progression state of a `KmerIndex`. @@ -17,7 +17,7 @@ pub enum IndexState { Empty, /// `scatter.done` sentinel present — all super-kmers have been routed. Scattered, - /// `kmer_spectrum_raw.json` present — dereplicate + count complete. + /// `count.done` sentinel present — dereplicate + count complete. Counted, /// `index.done` sentinel present — layered MPHF index fully built. Indexed, diff --git a/src/obikmer/src/cmd/index.rs b/src/obikmer/src/cmd/index.rs index 089402c..9f9bad2 100644 --- a/src/obikmer/src/cmd/index.rs +++ b/src/obikmer/src/cmd/index.rs @@ -93,7 +93,7 @@ pub fn run(args: IndexArgs) { // ── Stage 2: dereplicate + count ───────────────────────────────────────── if idx.state() < IndexState::Counted { - idx.dereplicate_and_count(&mut rep).unwrap_or_else(|e| { + idx.dereplicate_and_count(args.keep_intermediate, &mut rep).unwrap_or_else(|e| { eprintln!("error: {e}"); std::process::exit(1); }); diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index bd7934f..2e94e56 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -2,4 +2,4 @@ mod index_layer; mod kmer_sort; mod partition; -pub use partition::{KmerPartition, PARTITIONS_SUBDIR}; +pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; diff --git a/src/obikpartitionner/src/partition.rs b/src/obikpartitionner/src/partition.rs index 5740aa2..c668309 100644 --- a/src/obikpartitionner/src/partition.rs +++ b/src/obikpartitionner/src/partition.rs @@ -27,6 +27,12 @@ use crate::kmer_sort::{chunk_size_from_ram, sort_unique_kmers}; type Mphf = PtrHash>, Xx64, Vec>; +pub struct KmerSpectrum { + pub f0: u64, + pub f1: u64, + pub counts: BTreeMap, +} + const SK_EXT: &str = "skmer.zst"; pub const PARTITIONS_SUBDIR: &str = "partitions"; @@ -238,11 +244,13 @@ impl KmerPartition { /// 3. Writes a flat binary count file (`counts1.bin`, one `u32` per slot, /// memory-mapped) accumulating kmer abundances from the superkmer counts. /// 4. Persists the MPHF to `mphf1.bin` for downstream use. - /// 5. Writes a global `kmer_spectrum_raw.json` at the partition root. + /// + /// Returns the aggregated `KmerSpectrum`. Per-partition spectrum files are + /// deleted after aggregation unless `keep_partial` is true. /// /// Partitions are processed in parallel via Rayon (one task per thread). /// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously. - pub fn count_kmer(&self) -> SKResult<()> { + pub fn count_kmer(&self, keep_partial: bool) -> SKResult { let sys = System::new_all(); let available = match sys.available_memory() { 0 => sys.total_memory() / 2, @@ -282,10 +290,10 @@ impl KmerPartition { r?; } - // Aggregate per-partition spectra into a global one at the root. - let mut global_spectrum: BTreeMap = BTreeMap::new(); - let mut global_f0: u64 = 0; - let mut global_f1: u64 = 0; + // Aggregate per-partition spectra. + let mut counts: BTreeMap = BTreeMap::new(); + let mut f0: u64 = 0; + let mut f1: u64 = 0; for i in 0..self.n_partitions { let path = self.part_dir(i).join("kmer_spectrum_raw.json"); @@ -294,28 +302,21 @@ impl KmerPartition { } let v: serde_json::Value = serde_json::from_str(&fs::read_to_string(&path)?).map_err(io::Error::other)?; - global_f0 += v["f0"].as_u64().unwrap_or(0); - global_f1 += v["f1"].as_u64().unwrap_or(0); + f0 += v["f0"].as_u64().unwrap_or(0); + f1 += v["f1"].as_u64().unwrap_or(0); if let Some(obj) = v["spectrum"].as_object() { for (c_str, freq) in obj { if let (Ok(c), Some(f)) = (c_str.parse::(), freq.as_u64()) { - *global_spectrum.entry(c).or_insert(0) += f; + *counts.entry(c).or_insert(0) += f; } } } + if !keep_partial { + let _ = fs::remove_file(&path); + } } - let global_spectrum_map: BTreeMap = global_spectrum - .iter() - .map(|(&c, &f)| (format!("{c:010}"), f)) - .collect(); - serde_json::to_writer_pretty( - fs::File::create(self.root_path.join("kmer_spectrum_raw.json"))?, - &serde_json::json!({ "f0": global_f0, "f1": global_f1, "spectrum": &global_spectrum_map }), - ) - .map_err(io::Error::other)?; - - Ok(()) + Ok(KmerSpectrum { f0, f1, counts }) } // ── private ───────────────────────────────────────────────────────────────