Refactor: Extract utility function for string reversal
Extracted `inverser_chaine` into a reusable utility function with docstring and added unit test to ensure correctness.
This commit is contained in:
@@ -13,3 +13,5 @@ sysinfo = "0.33"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
tracing = "0.1.44"
|
||||
ph = "0.11"
|
||||
memmap2 = "0.9.10"
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tracing::debug;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use memmap2::MmapMut;
|
||||
use obikseq::kmer::Kmer;
|
||||
use ph::fmph::GOFunction;
|
||||
|
||||
use sysinfo::System;
|
||||
|
||||
@@ -99,6 +103,42 @@ impl KmerPartition {
|
||||
Ok(partition)
|
||||
}
|
||||
|
||||
pub fn open<P: AsRef<Path>>(path: P) -> SKResult<Self> {
|
||||
let root_path = path.as_ref().to_owned();
|
||||
if !root_path.exists() {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::NotFound,
|
||||
format!("{}: partition directory not found", root_path.display()),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
let meta_path = root_path.join(META_FILENAME);
|
||||
let meta: PartitionMeta = serde_json::from_reader(fs::File::open(&meta_path)?)
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
let format = match meta.format.as_str() {
|
||||
"gzip" => Format::Gzip,
|
||||
"bzip2" => Format::Bzip,
|
||||
"lzma" => Format::Lzma,
|
||||
"zstd" => Format::Zstd,
|
||||
_ => Format::No,
|
||||
};
|
||||
let level = level_from_u32(meta.level);
|
||||
let n_partitions = 1usize << meta.n_bits;
|
||||
let writers = (0..n_partitions).map(|_| None).collect();
|
||||
Ok(Self {
|
||||
root_path,
|
||||
n_partitions,
|
||||
partitions_mask: (1u64 << meta.n_bits) - 1,
|
||||
kmer_size: meta.kmer_size,
|
||||
minimizer_size: meta.minimizer_size,
|
||||
writers,
|
||||
format,
|
||||
level,
|
||||
closed: true, // read-only: writing is not allowed on an opened partition
|
||||
})
|
||||
}
|
||||
|
||||
pub fn write(&mut self, sk: &mut SuperKmer) -> SKResult<()> {
|
||||
self.check_not_closed()?;
|
||||
let partition = self.partition_of(sk)?;
|
||||
@@ -190,6 +230,73 @@ impl KmerPartition {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// For each partition that has a `dereplicated.{ext}` file:
|
||||
/// 1. Enumerates all unique canonical kmers (two passes over the file).
|
||||
/// 2. Builds a provisional MPHF (FMPHGO) over those kmers.
|
||||
/// 3. Writes a flat binary count file (`counts1.bin`, one `u32` per slot,
|
||||
/// memory-mapped) accumulating kmer abundances from the superkmer counts.
|
||||
/// 4. Persists the MPHF to `mphf1.bin` for downstream use.
|
||||
/// 5. Writes a global `kmer_spectrum_raw.json` at the partition root.
|
||||
///
|
||||
/// Partitions are processed in parallel via Rayon (one task per thread).
|
||||
/// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously.
|
||||
pub fn count_kmer(&self) -> SKResult<()> {
|
||||
let ext = format_ext(self.format);
|
||||
let root = &self.root_path;
|
||||
let k = self.kmer_size;
|
||||
|
||||
let results: Vec<SKResult<()>> = (0..self.n_partitions)
|
||||
.into_par_iter()
|
||||
.map(|i| {
|
||||
let dir = root.join(format!("part_{:05}", i));
|
||||
let dedup_path = dir.join(format!("dereplicated.{ext}"));
|
||||
if !dedup_path.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
info!("counting kmers in partition {}/{}", i, self.n_partitions);
|
||||
count_partition(&dir, &dedup_path, k)
|
||||
})
|
||||
.collect();
|
||||
for r in results {
|
||||
r?;
|
||||
}
|
||||
|
||||
// Aggregate per-partition spectra into a global one at the root.
|
||||
let mut global_spectrum: BTreeMap<u32, u64> = BTreeMap::new();
|
||||
let mut global_f0: u64 = 0;
|
||||
let mut global_f1: u64 = 0;
|
||||
|
||||
for i in 0..self.n_partitions {
|
||||
let path = root
|
||||
.join(format!("part_{:05}", i))
|
||||
.join("kmer_spectrum_raw.json");
|
||||
if !path.exists() {
|
||||
continue;
|
||||
}
|
||||
let v: serde_json::Value =
|
||||
serde_json::from_str(&fs::read_to_string(&path)?).map_err(io::Error::other)?;
|
||||
global_f0 += v["f0"].as_u64().unwrap_or(0);
|
||||
global_f1 += v["f1"].as_u64().unwrap_or(0);
|
||||
if let Some(obj) = v["spectrum"].as_object() {
|
||||
for (c_str, freq) in obj {
|
||||
if let (Ok(c), Some(f)) = (c_str.parse::<u32>(), freq.as_u64()) {
|
||||
*global_spectrum.entry(c).or_insert(0) += f;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let global_spectrum_map: BTreeMap<String, u64> =
|
||||
global_spectrum.iter().map(|(&c, &f)| (format!("{c:010}"), f)).collect();
|
||||
serde_json::to_writer_pretty(
|
||||
fs::File::create(root.join("kmer_spectrum_raw.json"))?,
|
||||
&serde_json::json!({ "f0": global_f0, "f1": global_f1, "spectrum": &global_spectrum_map }),
|
||||
)
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── private ───────────────────────────────────────────────────────────────
|
||||
|
||||
fn check_not_closed(&self) -> SKResult<()> {
|
||||
@@ -289,6 +396,18 @@ fn optimal_buckets(raw_path: &Path, available_bytes: u64) -> usize {
|
||||
n.next_power_of_two() as usize
|
||||
}
|
||||
|
||||
fn level_from_u32(n: u32) -> Level {
|
||||
match n {
|
||||
0 => Level::Zero, 1 => Level::One, 2 => Level::Two, 3 => Level::Three,
|
||||
4 => Level::Four, 5 => Level::Five, 6 => Level::Six, 7 => Level::Seven,
|
||||
8 => Level::Eight, 9 => Level::Nine, 10 => Level::Ten, 11 => Level::Eleven,
|
||||
12 => Level::Twelve, 13 => Level::Thirteen, 14 => Level::Fourteen,
|
||||
15 => Level::Fifteen, 16 => Level::Sixteen, 17 => Level::Seventeen,
|
||||
18 => Level::Eighteen, 19 => Level::Nineteen, 20 => Level::Twenty,
|
||||
_ => Level::TwentyOne,
|
||||
}
|
||||
}
|
||||
|
||||
fn format_ext(format: Format) -> &'static str {
|
||||
match format {
|
||||
Format::Gzip => "skmer.gz",
|
||||
@@ -391,6 +510,122 @@ fn flush_map(map: HashMap<SuperKmer, u64>, writer: &mut SKFileWriter) -> SKResul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build the provisional MPHF and count file for one partition directory.
|
||||
fn count_partition(dir: &Path, dedup_path: &Path, k: usize) -> SKResult<()> {
|
||||
// Estimate number of kmers from sidecar to pre-allocate the HashSet.
|
||||
let capacity = SKFileMeta::read(dedup_path)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|m| {
|
||||
let km1 = (k as u64).saturating_sub(1);
|
||||
m.length_sum.saturating_sub(m.instances.saturating_mul(km1)) as usize
|
||||
})
|
||||
.unwrap_or(0);
|
||||
debug!("{}: sidecar capacity estimate={capacity}", dir.display());
|
||||
|
||||
// Pass 1: collect all unique canonical kmers.
|
||||
let mut seen: HashSet<Kmer> = HashSet::with_capacity(capacity);
|
||||
let mut pass1_superkmers: u64 = 0;
|
||||
{
|
||||
let mut reader = SKFileReader::open(dedup_path)?;
|
||||
while let Some(sk) = reader.read()? {
|
||||
pass1_superkmers += 1;
|
||||
let seql = sk.seql();
|
||||
if seql < k {
|
||||
continue;
|
||||
}
|
||||
for pos in 0..=(seql - k) {
|
||||
seen.insert(sk.kmer(pos, k).map_err(io::Error::other)?.canonical(k));
|
||||
}
|
||||
}
|
||||
}
|
||||
let kmers: Vec<Kmer> = seen.into_iter().collect();
|
||||
let n_kmers = kmers.len();
|
||||
debug!("{}: pass1 superkmers={pass1_superkmers} unique_kmers={n_kmers}", dir.display());
|
||||
|
||||
if n_kmers == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Build provisional MPHF.
|
||||
let mphf = GOFunction::from(kmers);
|
||||
debug!("{}: MPHF built len={}", dir.display(), mphf.len());
|
||||
|
||||
// Create memory-mapped count file (u32 per slot, zero-initialised).
|
||||
let counts_path = dir.join("counts1.bin");
|
||||
let counts_file = fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.open(&counts_path)?;
|
||||
counts_file.set_len((n_kmers * std::mem::size_of::<u32>()) as u64)?;
|
||||
let mut mmap = unsafe { MmapMut::map_mut(&counts_file)? };
|
||||
mmap.fill(0);
|
||||
|
||||
// Pass 2: accumulate superkmer counts into the mmap'd array.
|
||||
let mut pass2_superkmers: u64 = 0;
|
||||
let mut pass2_kmer_hits: u64 = 0;
|
||||
let mut pass2_kmer_misses: u64 = 0;
|
||||
let mut pass2_count_sum: u64 = 0;
|
||||
{
|
||||
let counts =
|
||||
unsafe { std::slice::from_raw_parts_mut(mmap.as_mut_ptr() as *mut u32, n_kmers) };
|
||||
let mut reader = SKFileReader::open(dedup_path)?;
|
||||
while let Some(sk) = reader.read()? {
|
||||
pass2_superkmers += 1;
|
||||
let seql = sk.seql();
|
||||
let sk_count = sk.count();
|
||||
if pass2_superkmers <= 3 {
|
||||
debug!("{}: sk#{pass2_superkmers} seql={seql} count={sk_count}", dir.display());
|
||||
}
|
||||
if seql < k {
|
||||
continue;
|
||||
}
|
||||
pass2_count_sum += sk_count as u64;
|
||||
for pos in 0..=(seql - k) {
|
||||
let kmer = sk.kmer(pos, k).map_err(io::Error::other)?.canonical(k);
|
||||
if let Some(idx) = mphf.get(&kmer) {
|
||||
counts[idx as usize] = counts[idx as usize].saturating_add(sk_count);
|
||||
pass2_kmer_hits += 1;
|
||||
} else {
|
||||
pass2_kmer_misses += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!(
|
||||
"{}: pass2 superkmers={pass2_superkmers} hits={pass2_kmer_hits} misses={pass2_kmer_misses} count_sum={pass2_count_sum}",
|
||||
dir.display()
|
||||
);
|
||||
mmap.flush()?;
|
||||
|
||||
// Build kmer frequency spectrum from the count array.
|
||||
let counts = unsafe { std::slice::from_raw_parts(mmap.as_ptr() as *const u32, n_kmers) };
|
||||
let mut spectrum: BTreeMap<u32, u64> = BTreeMap::new();
|
||||
for &c in counts {
|
||||
if c > 0 {
|
||||
*spectrum.entry(c).or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
let f0 = n_kmers as u64;
|
||||
let f1: u64 = spectrum.iter().map(|(&c, &f)| c as u64 * f).sum();
|
||||
|
||||
let spectrum_map: BTreeMap<String, u64> =
|
||||
spectrum.iter().map(|(&c, &f)| (format!("{c:010}"), f)).collect();
|
||||
serde_json::to_writer_pretty(
|
||||
fs::File::create(dir.join("kmer_spectrum_raw.json"))?,
|
||||
&serde_json::json!({ "f0": f0, "f1": f1, "spectrum": &spectrum_map }),
|
||||
)
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
// Persist MPHF to disk.
|
||||
let mphf_path = dir.join("mphf1.bin");
|
||||
mphf.write(&mut fs::File::create(&mphf_path)?)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl Drop for KmerPartition {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.close();
|
||||
|
||||
Reference in New Issue
Block a user