refactor: replace single spectrum file with per-partition outputs
Replace the single `kmer_spectrum_raw.json` output with per-partition JSON files in a `spectrums/` directory. Add a `keep_intermediate` parameter to control intermediate file cleanup, and introduce a `write_spectrum` helper for serialization. Update the completion sentinel to `count.done` and align state documentation accordingly.
This commit is contained in:
@@ -1,17 +1,18 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
use obikpartitionner::KmerPartition;
|
use obikpartitionner::{KmerPartition, KmerSpectrum};
|
||||||
use obisys::{Reporter, Stage};
|
use obisys::{Reporter, Stage};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use crate::error::{OKIError, OKIResult};
|
use crate::error::{OKIError, OKIResult};
|
||||||
use crate::meta::{IndexConfig, IndexMeta};
|
use crate::meta::{IndexConfig, IndexMeta};
|
||||||
use crate::state::{IndexState, SENTINEL_INDEXED, SENTINEL_SCATTERED};
|
use crate::state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED};
|
||||||
|
|
||||||
pub struct KmerIndex {
|
pub struct KmerIndex {
|
||||||
root_path: PathBuf,
|
root_path: PathBuf,
|
||||||
@@ -96,16 +97,37 @@ impl KmerIndex {
|
|||||||
|
|
||||||
/// Dereplicate all partitions then compute kmer counts.
|
/// Dereplicate all partitions then compute kmer counts.
|
||||||
///
|
///
|
||||||
/// Writes `kmer_spectrum_raw.json` at the index root upon completion
|
/// Writes `spectrums/{label}.json` and touches `count.done` upon completion.
|
||||||
/// (this file doubles as the `Counted` sentinel).
|
/// Per-partition spectrum files are removed unless `keep_intermediate` is true.
|
||||||
pub fn dereplicate_and_count(&self, rep: &mut Reporter) -> OKIResult<()> {
|
pub fn dereplicate_and_count(&self, keep_intermediate: bool, rep: &mut Reporter) -> OKIResult<()> {
|
||||||
let t = Stage::start("dereplicate");
|
let t = Stage::start("dereplicate");
|
||||||
self.partition.dereplicate()?;
|
self.partition.dereplicate()?;
|
||||||
rep.push(t.stop());
|
rep.push(t.stop());
|
||||||
|
|
||||||
let t = Stage::start("count_kmer");
|
let t = Stage::start("count_kmer");
|
||||||
self.partition.count_kmer()?;
|
let spectrum = self.partition.count_kmer(keep_intermediate)?;
|
||||||
rep.push(t.stop());
|
rep.push(t.stop());
|
||||||
|
|
||||||
|
self.write_spectrum(&spectrum)?;
|
||||||
|
touch(&self.root_path.join(SENTINEL_COUNTED))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_spectrum(&self, sp: &KmerSpectrum) -> OKIResult<()> {
|
||||||
|
let label = self.meta.genomes.first().map(String::as_str).unwrap_or("unknown");
|
||||||
|
let spectrums_dir = self.root_path.join("spectrums");
|
||||||
|
fs::create_dir_all(&spectrums_dir)?;
|
||||||
|
let path = spectrums_dir.join(format!("{label}.json"));
|
||||||
|
let spectrum_map: BTreeMap<String, u64> = sp.counts
|
||||||
|
.iter()
|
||||||
|
.map(|(&c, &f)| (format!("{c:010}"), f))
|
||||||
|
.collect();
|
||||||
|
let f = fs::File::create(&path)?;
|
||||||
|
serde_json::to_writer_pretty(
|
||||||
|
f,
|
||||||
|
&serde_json::json!({ "f0": sp.f0, "f1": sp.f1, "spectrum": spectrum_map }),
|
||||||
|
)
|
||||||
|
.map_err(OKIError::Json)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use std::path::Path;
|
|||||||
use crate::meta::META_FILENAME;
|
use crate::meta::META_FILENAME;
|
||||||
|
|
||||||
pub const SENTINEL_SCATTERED: &str = "scatter.done";
|
pub const SENTINEL_SCATTERED: &str = "scatter.done";
|
||||||
pub const SENTINEL_COUNTED: &str = "kmer_spectrum_raw.json";
|
pub const SENTINEL_COUNTED: &str = "count.done";
|
||||||
pub const SENTINEL_INDEXED: &str = "index.done";
|
pub const SENTINEL_INDEXED: &str = "index.done";
|
||||||
|
|
||||||
/// Progression state of a `KmerIndex`.
|
/// Progression state of a `KmerIndex`.
|
||||||
@@ -17,7 +17,7 @@ pub enum IndexState {
|
|||||||
Empty,
|
Empty,
|
||||||
/// `scatter.done` sentinel present — all super-kmers have been routed.
|
/// `scatter.done` sentinel present — all super-kmers have been routed.
|
||||||
Scattered,
|
Scattered,
|
||||||
/// `kmer_spectrum_raw.json` present — dereplicate + count complete.
|
/// `count.done` sentinel present — dereplicate + count complete.
|
||||||
Counted,
|
Counted,
|
||||||
/// `index.done` sentinel present — layered MPHF index fully built.
|
/// `index.done` sentinel present — layered MPHF index fully built.
|
||||||
Indexed,
|
Indexed,
|
||||||
|
|||||||
@@ -93,7 +93,7 @@ pub fn run(args: IndexArgs) {
|
|||||||
|
|
||||||
// ── Stage 2: dereplicate + count ─────────────────────────────────────────
|
// ── Stage 2: dereplicate + count ─────────────────────────────────────────
|
||||||
if idx.state() < IndexState::Counted {
|
if idx.state() < IndexState::Counted {
|
||||||
idx.dereplicate_and_count(&mut rep).unwrap_or_else(|e| {
|
idx.dereplicate_and_count(args.keep_intermediate, &mut rep).unwrap_or_else(|e| {
|
||||||
eprintln!("error: {e}");
|
eprintln!("error: {e}");
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -2,4 +2,4 @@ mod index_layer;
|
|||||||
mod kmer_sort;
|
mod kmer_sort;
|
||||||
mod partition;
|
mod partition;
|
||||||
|
|
||||||
pub use partition::{KmerPartition, PARTITIONS_SUBDIR};
|
pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR};
|
||||||
|
|||||||
@@ -27,6 +27,12 @@ use crate::kmer_sort::{chunk_size_from_ram, sort_unique_kmers};
|
|||||||
|
|
||||||
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
|
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
|
||||||
|
|
||||||
|
pub struct KmerSpectrum {
|
||||||
|
pub f0: u64,
|
||||||
|
pub f1: u64,
|
||||||
|
pub counts: BTreeMap<u32, u64>,
|
||||||
|
}
|
||||||
|
|
||||||
const SK_EXT: &str = "skmer.zst";
|
const SK_EXT: &str = "skmer.zst";
|
||||||
pub const PARTITIONS_SUBDIR: &str = "partitions";
|
pub const PARTITIONS_SUBDIR: &str = "partitions";
|
||||||
|
|
||||||
@@ -238,11 +244,13 @@ impl KmerPartition {
|
|||||||
/// 3. Writes a flat binary count file (`counts1.bin`, one `u32` per slot,
|
/// 3. Writes a flat binary count file (`counts1.bin`, one `u32` per slot,
|
||||||
/// memory-mapped) accumulating kmer abundances from the superkmer counts.
|
/// memory-mapped) accumulating kmer abundances from the superkmer counts.
|
||||||
/// 4. Persists the MPHF to `mphf1.bin` for downstream use.
|
/// 4. Persists the MPHF to `mphf1.bin` for downstream use.
|
||||||
/// 5. Writes a global `kmer_spectrum_raw.json` at the partition root.
|
///
|
||||||
|
/// Returns the aggregated `KmerSpectrum`. Per-partition spectrum files are
|
||||||
|
/// deleted after aggregation unless `keep_partial` is true.
|
||||||
///
|
///
|
||||||
/// Partitions are processed in parallel via Rayon (one task per thread).
|
/// Partitions are processed in parallel via Rayon (one task per thread).
|
||||||
/// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously.
|
/// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously.
|
||||||
pub fn count_kmer(&self) -> SKResult<()> {
|
pub fn count_kmer(&self, keep_partial: bool) -> SKResult<KmerSpectrum> {
|
||||||
let sys = System::new_all();
|
let sys = System::new_all();
|
||||||
let available = match sys.available_memory() {
|
let available = match sys.available_memory() {
|
||||||
0 => sys.total_memory() / 2,
|
0 => sys.total_memory() / 2,
|
||||||
@@ -282,10 +290,10 @@ impl KmerPartition {
|
|||||||
r?;
|
r?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Aggregate per-partition spectra into a global one at the root.
|
// Aggregate per-partition spectra.
|
||||||
let mut global_spectrum: BTreeMap<u32, u64> = BTreeMap::new();
|
let mut counts: BTreeMap<u32, u64> = BTreeMap::new();
|
||||||
let mut global_f0: u64 = 0;
|
let mut f0: u64 = 0;
|
||||||
let mut global_f1: u64 = 0;
|
let mut f1: u64 = 0;
|
||||||
|
|
||||||
for i in 0..self.n_partitions {
|
for i in 0..self.n_partitions {
|
||||||
let path = self.part_dir(i).join("kmer_spectrum_raw.json");
|
let path = self.part_dir(i).join("kmer_spectrum_raw.json");
|
||||||
@@ -294,28 +302,21 @@ impl KmerPartition {
|
|||||||
}
|
}
|
||||||
let v: serde_json::Value =
|
let v: serde_json::Value =
|
||||||
serde_json::from_str(&fs::read_to_string(&path)?).map_err(io::Error::other)?;
|
serde_json::from_str(&fs::read_to_string(&path)?).map_err(io::Error::other)?;
|
||||||
global_f0 += v["f0"].as_u64().unwrap_or(0);
|
f0 += v["f0"].as_u64().unwrap_or(0);
|
||||||
global_f1 += v["f1"].as_u64().unwrap_or(0);
|
f1 += v["f1"].as_u64().unwrap_or(0);
|
||||||
if let Some(obj) = v["spectrum"].as_object() {
|
if let Some(obj) = v["spectrum"].as_object() {
|
||||||
for (c_str, freq) in obj {
|
for (c_str, freq) in obj {
|
||||||
if let (Ok(c), Some(f)) = (c_str.parse::<u32>(), freq.as_u64()) {
|
if let (Ok(c), Some(f)) = (c_str.parse::<u32>(), freq.as_u64()) {
|
||||||
*global_spectrum.entry(c).or_insert(0) += f;
|
*counts.entry(c).or_insert(0) += f;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !keep_partial {
|
||||||
|
let _ = fs::remove_file(&path);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let global_spectrum_map: BTreeMap<String, u64> = global_spectrum
|
Ok(KmerSpectrum { f0, f1, counts })
|
||||||
.iter()
|
|
||||||
.map(|(&c, &f)| (format!("{c:010}"), f))
|
|
||||||
.collect();
|
|
||||||
serde_json::to_writer_pretty(
|
|
||||||
fs::File::create(self.root_path.join("kmer_spectrum_raw.json"))?,
|
|
||||||
&serde_json::json!({ "f0": global_f0, "f1": global_f1, "spectrum": &global_spectrum_map }),
|
|
||||||
)
|
|
||||||
.map_err(io::Error::other)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── private ───────────────────────────────────────────────────────────────
|
// ── private ───────────────────────────────────────────────────────────────
|
||||||
|
|||||||
Reference in New Issue
Block a user