10 Commits

Author SHA1 Message Date
Eric Coissac 17c9e076bd refactor: extract obikindex crate and remove deprecated CLI commands
Extracted core indexing logic, state tracking, and metadata management into a new `obikindex` crate. Refactored the `index` and `unitig` commands to leverage the `KmerIndex` abstraction and state-driven pipeline transitions. Removed obsolete CLI subcommands (`count`, `fasta`, `longtig`, `partition`) and their associated pipeline steps. Updated FASTA writing utilities for single-line output and deterministic identifiers, and refreshed workspace dependencies.
2026-05-20 18:54:12 +02:00
Eric Coissac f8cfb493b8 refactor: extract pipeline stages and centralize partition directory paths
Extracts the scatter, dereplicate/count, and index pipeline stages into a new `steps` module to improve modularity. Centralizes partition directory path construction by introducing a `part_dir()` helper, replacing manual string formatting across multiple command files. Adds `--with-counts` and `--keep-intermediate` CLI flags to the index command and fixes a typo in the `partition_dir` parameter name.
2026-05-20 18:42:09 +02:00
Eric Coissac cc2ed4bd31 feat: Add progress tracking and timing instrumentation to index
Introduces comprehensive progress tracking and timing instrumentation using indicatif and obisys::Reporter/Stage. Adds an EMA-based throughput calculator for the scatter phase and wraps parallel progress bars in Arc<Mutex> for thread-safe concurrent updates across all pipeline stages.
2026-05-20 18:34:28 +02:00
Eric Coissac e66c4d81ef feat(obikmer): add index subcommand for kmer counting pipeline
Introduce the `index` CLI subcommand, implementing a resumable, multi-stage pipeline to partition, dereplicate, and count kmers from input sequences. The command builds a layered de Bruijn graph index per partition, applies optional abundance filtering, and persists unitigs alongside an MPHF-based count matrix. Update `Cargo.toml` and `Cargo.lock` to include new dependencies (`epserde`, `ptr_hash`, `cacheline-ef`, `obicompactvec`, `obilayeredmap`) required for the index builder, and refresh the profiling data files.
2026-05-20 18:25:12 +02:00
Eric Coissac c20a1ed465 perf: optimize k-mer pipeline with compile-time tables
This commit shifts entropy and lookup table generation to compile time via a new build script, eliminating runtime overhead. It replaces heap-allocated queues in rolling statistics with a stack-allocated, const-generic ring buffer for cache-friendly operations, and implements `size_hint` on `SuperKmerIter` for efficient iterator consumption. Additionally, it establishes the baseline profile configuration and sets global k-mer parameters.
2026-05-20 15:54:20 +02:00
Eric Coissac 9a1c0c0ee0 Add CLI progress bars and throughput metrics to partitioning
Add `indicatif` v0.17 to `obikmer` and `obikpartitionner` to instrument CLI workflows with real-time progress tracking. The changes integrate progress spinners and bars into the batch processing and parallel kmer counting loops, displaying processed base pairs, throughput rates, and elapsed time. Updates occur every 0.1s to enhance observability without modifying core partitioning logic.
2026-05-20 15:46:52 +02:00
Eric Coissac b80ab77d66 perf: Switch to sequential PHF construction to avoid thread contention
The outer partition loop already saturates parallelism, making parallel PHF construction redundant and causing Rayon thread pool contention. This change switches to a sequential variant to improve performance. Additionally, explicit error handling is now added for construction failures, while preserving the existing mmap-backed kmer slice.
2026-05-20 12:48:42 +02:00
Eric Coissac 6e2a4c977b fix: Replace unreliable memory pressure check with swap indicator
The previous `major_faults > 10` check is unreliable on macOS, as it counts file-backed mmap page-ins rather than true memory pressure. This change replaces it with `swaps > 0`, a more accurate cross-platform indicator of RAM exhaustion. The swap diagnosis message is also updated to clarify that the working set exceeds available RAM, and comments are added to document this rationale.
2026-05-19 11:41:41 +02:00
Eric Coissac 8c16b79983 feat(obikmer): add obisys profiling to partition pipeline
Added obisys as a local dependency and integrated its Reporter and Stage instrumentation into the partition command. Each major phase (scatter, dereplicate, and kmer-counting) is now wrapped in timing blocks, with aggregated execution times printed to stdout upon completion.
2026-05-19 11:40:20 +02:00
Eric Coissac d0c277d5b6 feat(obisys): Add stage-based performance profiler
Establishes the `obisys` crate using Rust 2024 and the `libc` dependency. Introduces a lightweight profiler that captures wall-clock time and `getrusage` system metrics per pipeline stage. Automatically computes parallelism and efficiency ratios, detects bottlenecks such as memory pressure and disk I/O, and prints a formatted diagnostic summary to stderr.
2026-05-19 11:40:20 +02:00
37 changed files with 1741 additions and 1632 deletions
Vendored
BIN
View File
Binary file not shown.
+27 -3
View File
@@ -1,4 +1,28 @@
## Dans OBILayeredMap ## Chose à vérifier suite à la commande index
- Est-ce que CachelineEfVec est vraiment justifier dans notre cas. vu les contraintes sur la distribution des valeurs imposées par CachelineEfVec en terme d'ordre, de grandeur et de dispersion ? - partition.meta ne devrait plus exister
- Il semble que le count de kmer soit stocké, ce qui doit-être une possibilité pas une obligation. - les spectrums globaux devrait etre identifier par génome
- regrouper dans un sous-dossier spectrums à la racine de l'index avec un nom basé sur le génome
- les spectrum patiels ont-ils vocation à être conserver ?
- l'étape de déreplication dure quasiment autant de temps que le comptage mais ne laisse aucune trace de progression à l'utilisateur
## commandes à ajouter
- merge : pour construire un index à partir d'index existants
- deux modes : count et presence/absence. count exige que tous les index mergés soient déjà en mode count. mode presence/absence par defaut. Si passage de mode count à mode presence/absence, par defaut presence = count >= 1. Possibilité de spécifier un seuil personnalisé.
- filter : produit un nouvel index filtré à partir d'un index existant en verifiant que les kmer présents dans le nouvel index respectent les critères de filtrage spécifiés
- quorum de presence en fraction-(min/max) du nombre de génomes, en nombre-(min/max) de génomes, si mode count la présence peut être défini par un seuil personnalisé minimum et maximum
- aggregate : aggrege toutes les colonnes d'une matrice d'index en une seule colonne.
- query : scan un fichier de sequences et retourne pour chaque sequence quels kmer sont présents dans l'index et dans quel genomes
- distance : calcule la matrice de distance entre les genomes
- proposer une option pour chaque distance à calculer
- un possibité de récuperer la matrice des kmer communs
- un possibité de calculer l'arbre nj
- les matrices sont sauvegardées en CSV
- les arbres NJ sont sauvegardés en Newick avec les longeurs de branche
- dump : une table csv de l'index avec les kmer et les genomes associés en mode count ou presence/absence avec une option pour forcer le mode presence/absence meme si l'index est en mode count. Par defaut, le mode count est utilisé pour les index en mode count et le mode presence/absence pour les index en mode presence/absence.
Binary file not shown.
+115 -649
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -1,5 +1,5 @@
[workspace] [workspace]
resolver = "3" resolver = "3"
members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio","obidebruinj","obilayeredmap", "obicompactvec", "obisys"] members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio","obidebruinj","obilayeredmap", "obicompactvec", "obisys", "obikindex"]
[profile.release] [profile.release]
debug = 1 debug = 1
-109
View File
@@ -1,12 +1,10 @@
//use ahash::RandomState; //use ahash::RandomState;
use hashbrown::HashMap; use hashbrown::HashMap;
use obifastwrite::write_unitig;
use obikseq::k; use obikseq::k;
use obikseq::unitig::Unitig; use obikseq::unitig::Unitig;
use obikseq::{CanonicalKmer, Kmer, Sequence}; use obikseq::{CanonicalKmer, Kmer, Sequence};
use std::cell::Cell; use std::cell::Cell;
use std::fmt; use std::fmt;
use std::io;
use xxhash_rust::xxh3::Xxh3Builder; use xxhash_rust::xxh3::Xxh3Builder;
// ── Types ───────────────────────────────────────────────────────────────────── // ── Types ─────────────────────────────────────────────────────────────────────
@@ -293,59 +291,6 @@ impl GraphDeBruijn {
Some(oriented) Some(oriented)
} }
fn next_longtig_kmer(&self, kmer: Kmer) -> Option<Kmer> {
let canonical = kmer.canonical();
let node = self.nodes.get(&canonical)?.get();
let direct = kmer.raw() == canonical.raw();
if (direct && node.n_right_neighbours() == 0) || (!direct && node.n_left_neighbours() == 0)
{
return None;
}
let next_c: CanonicalKmer = if direct {
if node.can_extend_right() {
canonical
.into_kmer()
.push_right(node.right_nuc())
.canonical()
} else {
self.iter_right_neighbors(canonical)
.filter(|n| !self.is_visited(n).unwrap_or(true))
.next()?
}
} else {
if node.can_extend_left() {
canonical.into_kmer().push_left(node.left_nuc()).canonical()
} else {
self.iter_left_neighbors(canonical)
.filter(|n| !self.is_visited(n).unwrap_or(true))
.next()?
}
};
let cell = self.nodes.get(&next_c)?;
let next_node = cell.get();
if next_node.is_visited() {
return None;
}
let oriented = oriented_next(kmer, next_c);
let ndirect = oriented.raw() == next_c.raw();
if (ndirect && next_node.n_right_neighbours() > 1)
|| (!ndirect && next_node.n_left_neighbours() > 1)
{
return None;
}
let mut updated = next_node;
updated.set_visited();
cell.set(updated);
Some(oriented)
}
fn iter_unitig_kmers(&self, start: Kmer) -> UnitigIter<'_> { fn iter_unitig_kmers(&self, start: Kmer) -> UnitigIter<'_> {
UnitigIter { UnitigIter {
graph: self, graph: self,
@@ -353,13 +298,6 @@ impl GraphDeBruijn {
} }
} }
fn iter_longtig_kmers(&self, start: Kmer) -> LongtigIter<'_> {
LongtigIter {
graph: self,
current: Some(start),
}
}
pub fn iter_unitig(&self) -> impl Iterator<Item = Unitig> + '_ { pub fn iter_unitig(&self) -> impl Iterator<Item = Unitig> + '_ {
let k = k(); let k = k();
self.start_iter().map(move |(start, first_next)| { self.start_iter().map(move |(start, first_next)| {
@@ -373,36 +311,6 @@ impl GraphDeBruijn {
}) })
} }
pub fn iter_longtig(&self) -> impl Iterator<Item = Unitig> + '_ {
let k = k();
self.start_iter().map(move |(start, first_next)| {
let mut nucs: Vec<u8> = (0..k).map(|i| start.nucleotide(i)).collect();
if let Some(next_c) = first_next {
for kmer in self.iter_longtig_kmers(next_c) {
nucs.push(kmer.nucleotide(k - 1));
}
}
Unitig::from_nucleotides(&nucs)
})
}
/// Write all unitigs to `out` in FASTA format.
///
/// Calls [`obifastwrite::write_unitig`] for each unitig produced by
/// [`iter_unitig`]. Stops and returns the first I/O error encountered.
pub fn write_fasta<W: io::Write>(&self, out: &mut W, unitig: bool) -> io::Result<()> {
if unitig {
for unitig in self.iter_unitig() {
write_unitig(&unitig, k(), out)?;
}
} else {
for unitig in self.iter_longtig() {
write_unitig(&unitig, k(), out)?;
}
}
Ok(())
}
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.nodes.len() self.nodes.len()
} }
@@ -516,23 +424,6 @@ impl Iterator for UnitigIter<'_> {
} }
} }
// ── UnitigIter ────────────────────────────────────────────────────────────────
struct LongtigIter<'a> {
graph: &'a GraphDeBruijn,
current: Option<Kmer>,
}
impl Iterator for LongtigIter<'_> {
type Item = Kmer;
fn next(&mut self) -> Option<Kmer> {
let current = self.current?;
self.current = self.graph.next_longtig_kmer(current);
Some(current)
}
}
// ── helpers ─────────────────────────────────────────────────────────────────── // ── helpers ───────────────────────────────────────────────────────────────────
fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer { fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer {
+30 -12
View File
@@ -1,9 +1,11 @@
use std::fmt; use std::fmt;
use std::io::{self, Write}; use std::io::{self, Write};
use xxhash_rust::xxh64::xxh64;
pub(crate) enum JsonVal<'a> { /// A JSON value that is either a number or a quoted string.
pub enum JsonVal<'a> {
/// Integer value, serialised without quotes.
Num(u64), Num(u64),
/// String value, serialised with double quotes.
Str(&'a str), Str(&'a str),
} }
@@ -16,11 +18,8 @@ impl fmt::Display for JsonVal<'_> {
} }
} }
pub(crate) fn seq_id(ascii: &[u8]) -> String { /// Write a JSON object `{"k1":v1,"k2":v2,...}` to `writer`.
format!("{:016X}", xxh64(ascii, 0)) pub fn annotation<W: Write>(
}
pub(crate) fn annotation<W: Write>(
writer: &mut W, writer: &mut W,
fields: &[(&str, JsonVal<'_>)], fields: &[(&str, JsonVal<'_>)],
) -> io::Result<()> { ) -> io::Result<()> {
@@ -34,10 +33,29 @@ pub(crate) fn annotation<W: Write>(
write!(writer, "}}") write!(writer, "}}")
} }
pub(crate) fn write_sequence<W: Write>(writer: &mut W, seq: &[u8], width: usize) -> io::Result<()> { /// xxHash-64 of `ascii`, formatted as 16 uppercase hex digits.
for chunk in seq.chunks(width) { pub fn seq_id(ascii: &[u8]) -> String {
// SAFETY: seq is valid UTF-8; any contiguous slice of ASCII bytes is too use xxhash_rust::xxh64::xxh64;
writeln!(writer, "{}", unsafe { std::str::from_utf8_unchecked(chunk) })?; format!("{:016X}", xxh64(ascii, 0))
} }
Ok(())
/// Write `seq` as one line of ASCII DNA, followed by a newline.
pub fn write_sequence<W: Write>(writer: &mut W, seq: &[u8]) -> io::Result<()> {
// SAFETY: seq is valid ASCII DNA (A/C/G/T).
writeln!(writer, "{}", unsafe { std::str::from_utf8_unchecked(seq) })
}
/// Core FASTA record writer.
///
/// Writes `>{id} {annotation}\n{sequence}\n`.
pub fn write_record<W: Write>(
seq: &[u8],
id: &str,
fields: &[(&str, JsonVal<'_>)],
out: &mut W,
) -> io::Result<()> {
write!(out, ">{id} ")?;
annotation(out, fields)?;
writeln!(out)?;
write_sequence(out, seq)
} }
+89 -116
View File
@@ -1,32 +1,20 @@
//! FASTA serialisation of [`SuperKmer`] values. //! FASTA serialisation for obikmer sequence types.
//! //!
//! Two functions cover the two phases of the scatter pipeline: //! Three public functions cover the main output cases:
//! //!
//! - [`write_scatter`]: scatter phase (before routing). The header annotation //! - [`write_scatter`]: super-kmers in scatter phase (minimizer annotation)
//! contains the minimizer sequence decoded from [`SuperKmer::minimizer_pos`]. //! - [`write_count`]: super-kmers in count phase (occurrence count annotation)
//! - [`write_unitig`]: unitigs from the layered index (partition + index annotation)
//! //!
//! - [`write_count`]: count phase (after deduplication). The header annotation //! All produce OBITools-compatible FASTA:
//! contains the occurrence count from [`SuperKmer::count`].
//!
//! Both functions write standard OBITools-compatible FASTA:
//! //!
//! ```text //! ```text
//! >ID {"seq_length":32,"kmer_size":31,"minimizer_size":11,"partition":42,"minimizer":"CGTGCTAGATC"} //! >ID {"key":value,...}
//! GCTAGCATGCTAGCTGTAGCTGTGAGTGCTG //! SEQUENCE
//! ``` //! ```
//! //!
//! The record identifier is the xxHash-64 of the ASCII sequence, formatted as //! The lower-level primitive [`write_record`] and the [`JsonVal`] type are also
//! a 16-digit uppercase hexadecimal string. xxHash-64 is collision-resistant //! public for callers that need custom annotations.
//! enough for debugging identifiers (collision probability < 1e-9 for billions
//! of distinct super-kmers).
//!
//! # Phase contract
//!
//! `write_scatter` reads [`SuperKmer::minimizer_pos`], which is only valid
//! **before** [`SuperKmer::init_count`] is called. `write_count` reads
//! [`SuperKmer::count`], which is only meaningful **after** `init_count`.
//! Mixing the two functions in the wrong phase produces silently wrong output;
//! this is enforced by pipeline structure, not by the type system.
#![deny(missing_docs)] #![deny(missing_docs)]
@@ -35,22 +23,15 @@ mod fasta;
use std::io::{self, Write}; use std::io::{self, Write};
use obikseq::{Minimizer, SuperKmer, Unitig}; use obikseq::{Minimizer, SuperKmer, Unitig};
use xxhash_rust::xxh64::xxh64;
pub use fasta::{JsonVal, annotation, seq_id, write_record};
// ── public API ──────────────────────────────────────────────────────────────── // ── public API ────────────────────────────────────────────────────────────────
/// Write one super-kmer in FASTA format — **scatter phase**. /// Write one super-kmer in FASTA format — **scatter phase**.
/// ///
/// The `minimizer` field in the JSON annotation contains the ASCII sequence of /// ID is the xxHash-64 of the sequence. JSON annotation includes
/// the minimizer, decoded from [`SuperKmer::minimizer_pos`] (scatter-phase /// `seq_length`, `kmer_size`, `minimizer_size`, `partition`, `minimizer`.
/// value of the payload field).
///
/// # Parameters
/// - `sk`: the super-kmer to serialise (must be in scatter phase)
/// - `out`: destination writer
/// - `k`: k-mer size used to build `sk`
/// - `m`: minimizer size
/// - `partition`: partition index computed from the minimizer hash
pub fn write_scatter<W: Write>( pub fn write_scatter<W: Write>(
sk: &SuperKmer, sk: &SuperKmer,
out: &mut W, out: &mut W,
@@ -61,37 +42,26 @@ pub fn write_scatter<W: Write>(
) -> io::Result<()> { ) -> io::Result<()> {
let ascii = sk.to_ascii(); let ascii = sk.to_ascii();
let id = seq_id(&ascii); let id = seq_id(&ascii);
let seq_len = ascii.len();
let min_seq = minimizer.to_ascii(); let min_seq = minimizer.to_ascii();
let min_str = unsafe { std::str::from_utf8_unchecked(&min_seq) };
writeln!( write_record(
&ascii,
&id,
&[
("seq_length", JsonVal::Num(ascii.len() as u64)),
("kmer_size", JsonVal::Num(k as u64)),
("minimizer_size",JsonVal::Num(m as u64)),
("partition", JsonVal::Num(partition as u64)),
("minimizer", JsonVal::Str(min_str)),
],
out, out,
">{id} {{\"seq_length\":{seq_len},\"kmer_size\":{k},\ )
\"minimizer_size\":{m},\"partition\":{partition},\
\"minimizer\":\"{min}\"}}",
id = id,
seq_len = seq_len,
k = k,
m = m,
partition = partition,
min = unsafe { std::str::from_utf8_unchecked(&min_seq) },
)?;
out.write_all(&ascii)?;
out.write_all(b"\n")
} }
/// Write one super-kmer in FASTA format — **count phase**. /// Write one super-kmer in FASTA format — **count phase**.
/// ///
/// The `count` field in the JSON annotation contains the occurrence count from /// ID is the xxHash-64 of the sequence. JSON annotation includes
/// [`SuperKmer::count`] (count-phase value of the payload field). /// `seq_length`, `kmer_size`, `minimizer_size`, `partition`, `count`.
///
/// # Parameters
/// - `sk`: the super-kmer to serialise (must be in count phase, i.e. after
/// [`SuperKmer::init_count`] has been called)
/// - `out`: destination writer
/// - `k`: k-mer size
/// - `m`: minimizer size
/// - `partition`: partition index
pub fn write_count<W: Write>( pub fn write_count<W: Write>(
sk: &SuperKmer, sk: &SuperKmer,
out: &mut W, out: &mut W,
@@ -101,52 +71,47 @@ pub fn write_count<W: Write>(
) -> io::Result<()> { ) -> io::Result<()> {
let ascii = sk.to_ascii(); let ascii = sk.to_ascii();
let id = seq_id(&ascii); let id = seq_id(&ascii);
let seq_len = ascii.len(); write_record(
let count = sk.count(); &ascii,
&id,
writeln!( &[
("seq_length", JsonVal::Num(ascii.len() as u64)),
("kmer_size", JsonVal::Num(k as u64)),
("minimizer_size",JsonVal::Num(m as u64)),
("partition", JsonVal::Num(partition as u64)),
("count", JsonVal::Num(sk.count() as u64)),
],
out, out,
">{id} {{\"seq_length\":{seq_len},\"kmer_size\":{k},\ )
\"minimizer_size\":{m},\"partition\":{partition},\
\"count\":{count}}}",
id = id,
seq_len = seq_len,
k = k,
m = m,
partition = partition,
count = count,
)?;
out.write_all(&ascii)?;
out.write_all(b"\n")
} }
/// Write one unitig in FASTA format. /// Write one unitig in FASTA format.
/// ///
/// Header annotation (JSON): /// ID is `part_PPPPP_unitig_IIIIII` where `P` is the partition index and `I`
/// ```text /// is the unitig index within that partition. JSON annotation includes
/// >HASH {"seq_length":<seql>,"kmer_size":<k>,"n_kmers":<seql-k+1>} /// `seq_length`, `kmer_size`, `n_kmers`, `partition`, `unitig_index`.
/// ``` pub fn write_unitig<W: Write>(
/// unitig: &Unitig,
/// `HASH` is the xxHash-64 of the ASCII sequence (16 uppercase hex digits). k: usize,
/// `n_kmers` is the number of distinct k-mers covered by this unitig. partition: usize,
pub fn write_unitig<W: Write>(unitig: &Unitig, k: usize, out: &mut W) -> io::Result<()> { index: usize,
out: &mut W,
) -> io::Result<()> {
let ascii = unitig.to_ascii(); let ascii = unitig.to_ascii();
let id = seq_id(&ascii);
let seql = unitig.seql(); let seql = unitig.seql();
let n_kmers = seql - k + 1; let id = format!("part_{partition:05}_unitig_{index:06}");
writeln!( write_record(
&ascii,
&id,
&[
("seq_length", JsonVal::Num(seql as u64)),
("kmer_size", JsonVal::Num(k as u64)),
("n_kmers", JsonVal::Num((seql - k + 1) as u64)),
("partition", JsonVal::Num(partition as u64)),
("unitig_index", JsonVal::Num(index as u64)),
],
out, out,
">{id} {{\"seq_length\":{seql},\"kmer_size\":{k},\"n_kmers\":{n_kmers}}}", )
)?;
out.write_all(&ascii)?;
out.write_all(b"\n")
}
// ── internal helpers ──────────────────────────────────────────────────────────
/// xxHash-64 of the ASCII sequence, formatted as 16 uppercase hex digits.
fn seq_id(ascii: &[u8]) -> String {
format!("{:016X}", xxh64(ascii, 0))
} }
// ── tests ───────────────────────────────────────────────────────────────────── // ── tests ─────────────────────────────────────────────────────────────────────
@@ -178,9 +143,6 @@ mod tests {
#[test] #[test]
fn scatter_minimizer_decoded_from_hash() { fn scatter_minimizer_decoded_from_hash() {
// "ACG" right-aligned: A=00, C=01, G=10 → 0b000110 = 6
// Left-aligned for m=3: shift by 64 2·3 = 58.
// set_m(3) so that Minimizer::to_ascii() decodes exactly 3 bases.
obikseq::params::set_m(3); obikseq::params::set_m(3);
let sk = make(b"ACGTACGTACGT"); let sk = make(b"ACGTACGTACGT");
let minimizer = Minimizer::from_raw_unchecked(6u64 << (64 - 2 * 3)); let minimizer = Minimizer::from_raw_unchecked(6u64 << (64 - 2 * 3));
@@ -230,13 +192,34 @@ mod tests {
#[test] #[test]
fn count_sequence_line_correct() { fn count_sequence_line_correct() {
// TTTTACGT canonicalises to ACGTAAAA (revcomp is ACGTAAAA < TTTTACGT)
let sk = make(b"TTTTACGT"); let sk = make(b"TTTTACGT");
let out = capture(|w| write_count(&sk, w, 4, 2, 0)); let out = capture(|w| write_count(&sk, w, 4, 2, 0));
let lines: Vec<&str> = out.lines().collect(); let lines: Vec<&str> = out.lines().collect();
assert_eq!(lines[1], "ACGTAAAA"); assert_eq!(lines[1], "ACGTAAAA");
} }
// ── write_unitig ──────────────────────────────────────────────────────────
#[test]
fn unitig_id_format() {
obikseq::params::set_k(4);
let unitig = obikseq::packed_seq::PackedSeq::from_ascii(b"ACGTACGT");
let out = capture(|w| write_unitig(&unitig, 4, 3, 17, w));
let id = out.lines().next().unwrap();
assert!(id.starts_with(">part_00003_unitig_000017"), "got: {id}");
}
#[test]
fn unitig_annotation_fields() {
obikseq::params::set_k(4);
let unitig = obikseq::packed_seq::PackedSeq::from_ascii(b"ACGTACGT");
let out = capture(|w| write_unitig(&unitig, 4, 2, 5, w));
assert!(out.contains("\"partition\":2"));
assert!(out.contains("\"unitig_index\":5"));
assert!(out.contains("\"n_kmers\":5"));
assert!(out.contains("\"kmer_size\":4"));
}
// ── ID stability ────────────────────────────────────────────────────────── // ── ID stability ──────────────────────────────────────────────────────────
#[test] #[test]
@@ -260,7 +243,7 @@ mod tests {
.next() .next()
.unwrap()[1..] .unwrap()[1..]
.to_string(); .to_string();
assert_eq!(id1, id2, "same sequence must produce same ID"); assert_eq!(id1, id2);
} }
#[test] #[test]
@@ -269,21 +252,11 @@ mod tests {
let sk2 = make(b"TTTTTTTT"); let sk2 = make(b"TTTTTTTT");
let id1 = capture(|w| write_scatter(&sk1, w, 4, 2, 0, Minimizer::from_raw_unchecked(0))) let id1 = capture(|w| write_scatter(&sk1, w, 4, 2, 0, Minimizer::from_raw_unchecked(0)))
.lines() .lines().next().unwrap()
.next() .split_whitespace().next().unwrap()[1..].to_string();
.unwrap()
.split_whitespace()
.next()
.unwrap()[1..]
.to_string();
let id2 = capture(|w| write_scatter(&sk2, w, 4, 2, 0, Minimizer::from_raw_unchecked(0))) let id2 = capture(|w| write_scatter(&sk2, w, 4, 2, 0, Minimizer::from_raw_unchecked(0)))
.lines() .lines().next().unwrap()
.next() .split_whitespace().next().unwrap()[1..].to_string();
.unwrap()
.split_whitespace()
.next()
.unwrap()[1..]
.to_string();
assert_ne!(id1, id2); assert_ne!(id1, id2);
} }
@@ -291,7 +264,7 @@ mod tests {
fn id_is_16_hex_digits() { fn id_is_16_hex_digits() {
let sk = make(b"ACGTACGT"); let sk = make(b"ACGTACGT");
let out = capture(|w| write_scatter(&sk, w, 4, 2, 0, Minimizer::from_raw_unchecked(0))); let out = capture(|w| write_scatter(&sk, w, 4, 2, 0, Minimizer::from_raw_unchecked(0)));
let id = &out.lines().next().unwrap()[1..17]; // skip '>' let id = &out.lines().next().unwrap()[1..17];
assert_eq!(id.len(), 16); assert_eq!(id.len(), 16);
assert!(id.chars().all(|c| c.is_ascii_hexdigit())); assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
} }
+21
View File
@@ -0,0 +1,21 @@
[package]
name = "obikindex"
version = "0.1.0"
edition = "2024"
[dependencies]
obikpartitionner = { path = "../obikpartitionner" }
obikseq = { path = "../obikseq" }
obisys = { path = "../obisys" }
obiskio = { path = "../obiskio" }
obidebruinj = { path = "../obidebruinj" }
obilayeredmap = { path = "../obilayeredmap" }
obicompactvec = { path = "../obicompactvec" }
cacheline-ef = "1.1"
epserde = "0.8"
ptr_hash = "1.1"
rayon = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
indicatif = "0.17"
tracing = "0.1.44"
+53
View File
@@ -0,0 +1,53 @@
use std::fmt;
use std::io;
use obiskio::SKError;
use obilayeredmap::OLMError;
#[derive(Debug)]
pub enum OKIError {
Io(io::Error),
Json(serde_json::Error),
Partition(SKError),
Layer(OLMError),
}
pub type OKIResult<T> = Result<T, OKIError>;
impl fmt::Display for OKIError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
OKIError::Io(e) => write!(f, "I/O error: {e}"),
OKIError::Json(e) => write!(f, "JSON error: {e}"),
OKIError::Partition(e) => write!(f, "partition error: {e}"),
OKIError::Layer(e) => write!(f, "layer error: {e}"),
}
}
}
impl std::error::Error for OKIError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
OKIError::Io(e) => Some(e),
OKIError::Json(e) => Some(e),
OKIError::Partition(e) => Some(e),
OKIError::Layer(e) => Some(e),
}
}
}
impl From<io::Error> for OKIError {
fn from(e: io::Error) -> Self { OKIError::Io(e) }
}
impl From<serde_json::Error> for OKIError {
fn from(e: serde_json::Error) -> Self { OKIError::Json(e) }
}
impl From<SKError> for OKIError {
fn from(e: SKError) -> Self { OKIError::Partition(e) }
}
impl From<OLMError> for OKIError {
fn from(e: OLMError) -> Self { OKIError::Layer(e) }
}
+301
View File
@@ -0,0 +1,301 @@
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use cacheline_ef::{CachelineEf, CachelineEfVec};
use epserde::prelude::*;
use indicatif::{ProgressBar, ProgressStyle};
use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec};
use obidebruinj::GraphDeBruijn;
use obikpartitionner::KmerPartition;
use obilayeredmap::layer::Layer;
use obiskio::{SKFileMeta, SKFileReader};
use obisys::{Reporter, Stage};
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
use rayon::prelude::*;
use tracing::info;
use crate::error::{OKIError, OKIResult};
use crate::meta::{IndexConfig, IndexMeta};
use crate::state::{IndexState, SENTINEL_INDEXED, SENTINEL_SCATTERED};
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
pub struct KmerIndex {
root_path: PathBuf,
meta: IndexMeta,
partition: KmerPartition,
}
impl KmerIndex {
/// Create a new index at `path`.
///
/// If `genome_label` is `Some`, it is stored immediately.
/// If `None`, the label will be derived from the first scatter input path
/// when `mark_scattered` is called.
pub fn create<P: AsRef<Path>>(
path: P,
config: IndexConfig,
genome_label: Option<String>,
force: bool,
) -> OKIResult<Self> {
let root_path = path.as_ref().to_owned();
let partition = KmerPartition::create(
&root_path,
config.n_bits,
config.kmer_size,
config.minimizer_size,
force,
)?;
let mut meta = IndexMeta::new(config);
if let Some(label) = genome_label {
meta.genomes.push(label);
}
meta.write(&root_path)?;
Ok(Self { root_path, meta, partition })
}
pub fn open<P: AsRef<Path>>(path: P) -> OKIResult<Self> {
let root_path = path.as_ref().to_owned();
let meta = IndexMeta::read(&root_path).map_err(OKIError::Io)?;
let partition = KmerPartition::open(&root_path)?;
Ok(Self { root_path, meta, partition })
}
/// Return `true` if `path` contains an `index.meta` file.
pub fn exists<P: AsRef<Path>>(path: P) -> bool {
IndexMeta::exists(path.as_ref())
}
/// Current construction state, as reported by sentinel files on disk.
pub fn state(&self) -> IndexState {
IndexState::detect(&self.root_path).unwrap_or(IndexState::Empty)
}
pub fn meta(&self) -> &IndexMeta { &self.meta }
pub fn kmer_size(&self) -> usize { self.meta.config.kmer_size }
pub fn minimizer_size(&self) -> usize { self.meta.config.minimizer_size }
pub fn n_partitions(&self) -> usize { self.partition.n_partitions() }
/// Expose the inner partition so the caller can run scatter into it.
/// Call `mark_scattered` once scatter is complete.
pub fn partition_mut(&mut self) -> &mut KmerPartition {
&mut self.partition
}
/// Mark scatter as complete and write `scatter.done`.
///
/// If no genome label was set at creation time, one is derived from
/// `first_scatter_path` (filename stripped of all extensions).
/// If `first_scatter_path` is also `None`, the label defaults to `"unknown"`.
pub fn mark_scattered(&mut self, first_scatter_path: Option<&Path>) -> OKIResult<()> {
if self.meta.genomes.is_empty() {
let label = first_scatter_path
.map(label_from_path)
.unwrap_or_else(|| "unknown".to_string());
self.meta.genomes.push(label);
self.meta.write(&self.root_path)?;
}
touch(&self.root_path.join(SENTINEL_SCATTERED))?;
Ok(())
}
/// Dereplicate all partitions then compute kmer counts.
///
/// Writes `kmer_spectrum_raw.json` at the index root upon completion
/// (this file doubles as the `Counted` sentinel).
pub fn dereplicate_and_count(&self, rep: &mut Reporter) -> OKIResult<()> {
let t = Stage::start("dereplicate");
self.partition.dereplicate()?;
rep.push(t.stop());
let t = Stage::start("count_kmer");
self.partition.count_kmer()?;
rep.push(t.stop());
Ok(())
}
/// Build the layered MPHF index for all partitions.
///
/// Default mode (`config.with_counts = false`): set membership only.
/// With counts: count matrix per kmer.
///
/// Writes `index.done` upon completion.
/// Path to the unitigs file for partition `part`, layer `layer`.
pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf {
self.partition.part_dir(part)
.join("index")
.join(format!("layer_{layer}"))
.join("unitigs.bin")
}
pub fn build_layers(
&self,
min_ab: u32,
max_ab: Option<u32>,
keep_intermediate: bool,
rep: &mut Reporter,
) -> OKIResult<()> {
let n = self.partition.n_partitions();
let t = Stage::start("index");
let with_counts = self.meta.config.with_counts;
let filter_active = min_ab > 1 || max_ab.is_some();
let need_counts = filter_active || with_counts;
let total_kmers = AtomicUsize::new(0);
let partition = &self.partition;
let pb = Arc::new(Mutex::new(
ProgressBar::new(n as u64).with_style(
ProgressStyle::with_template("index — [{bar:20}] {pos}/{len} | {msg}").unwrap(),
),
));
(0..n).into_par_iter().for_each(|i| {
let part_dir = partition.part_dir(i);
let dedup_path = part_dir.join("dereplicated.skmer.zst");
if !dedup_path.exists() {
return;
}
let layer_dir = part_dir.join("index").join("layer_0");
if layer_dir.join("mphf.bin").exists() {
return;
}
let mphf1_opt: Option<Mphf> = if need_counts {
let p = part_dir.join("mphf1.bin");
p.exists().then(|| Mphf::load_full(&p).ok()).flatten()
} else {
None
};
let counts1_opt: Option<PersistentCompactIntVec> = if need_counts {
let p = part_dir.join("counts1.bin");
p.exists()
.then(|| PersistentCompactIntVec::open(&p).ok())
.flatten()
} else {
None
};
let mut g = GraphDeBruijn::new();
let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", dedup_path.display());
std::process::exit(1);
});
for sk in reader.iter() {
for kmer in sk.iter_canonical_kmers() {
let accept = if filter_active {
match (&mphf1_opt, &counts1_opt) {
(Some(mphf), Some(counts)) => {
let ab = counts.get(mphf.index(&kmer.raw()));
ab >= min_ab && max_ab.map_or(true, |max| ab <= max)
}
_ => true,
}
} else {
true
};
if accept {
g.push(kmer);
}
}
}
let n_kmers = g.len();
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
g.compute_degrees();
fs::create_dir_all(&layer_dir).unwrap_or_else(|e| {
eprintln!("error creating {}: {e}", layer_dir.display());
std::process::exit(1);
});
let mut uw = Layer::<()>::unitig_writer(&layer_dir).unwrap_or_else(|e| {
eprintln!("error creating unitig writer (partition {i}): {e}");
std::process::exit(1);
});
for unitig in g.iter_unitig() {
uw.write(&unitig).unwrap_or_else(|e| {
eprintln!("error writing unitig (partition {i}): {e}");
std::process::exit(1);
});
}
uw.close().unwrap_or_else(|e| {
eprintln!("error closing unitig writer (partition {i}): {e}");
std::process::exit(1);
});
if with_counts {
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, |kmer| {
match (&mphf1_opt, &counts1_opt) {
(Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())),
_ => 1,
}
})
.unwrap_or_else(|e| {
eprintln!("error building count layer (partition {i}): {e}");
std::process::exit(1);
});
} else {
Layer::<()>::build(&layer_dir).unwrap_or_else(|e| {
eprintln!("error building set layer (partition {i}): {e}");
std::process::exit(1);
});
}
let pb = pb.lock().unwrap();
pb.inc(1);
pb.set_message(format!("{i}: {n_kmers} kmers"));
});
pb.lock().unwrap().finish_and_clear();
info!(
"done — {} total kmers indexed",
total_kmers.load(Ordering::Relaxed)
);
if !keep_intermediate {
for i in 0..n {
let part_dir = partition.part_dir(i);
remove_if_exists(&part_dir.join("dereplicated.skmer.zst"));
remove_if_exists(&SKFileMeta::sidecar_path(
&part_dir.join("dereplicated.skmer.zst"),
));
remove_if_exists(&part_dir.join("mphf1.bin"));
remove_if_exists(&part_dir.join("counts1.bin"));
}
}
touch(&self.root_path.join(SENTINEL_INDEXED))?;
rep.push(t.stop());
Ok(())
}
}
/// Derive a genome label from a file path: filename stripped of all extensions.
fn label_from_path(path: &Path) -> String {
let name = path
.file_name()
.unwrap_or(path.as_os_str())
.to_string_lossy()
.into_owned();
let mut s = name;
while let Some(pos) = s.rfind('.') {
s.truncate(pos);
}
if s.is_empty() { "unknown".to_string() } else { s }
}
fn touch(path: &Path) -> Result<(), std::io::Error> {
fs::File::create(path).map(|_| ())
}
fn remove_if_exists(path: &Path) {
if let Err(e) = fs::remove_file(path) {
if e.kind() != std::io::ErrorKind::NotFound {
eprintln!("warning: could not remove {}: {e}", path.display());
}
}
}
+9
View File
@@ -0,0 +1,9 @@
pub mod error;
pub mod meta;
pub mod state;
mod index;
pub use error::{OKIError, OKIResult};
pub use index::KmerIndex;
pub use meta::{IndexConfig, IndexMeta, META_FILENAME};
pub use state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED};
+45
View File
@@ -0,0 +1,45 @@
use std::fs;
use std::io;
use std::path::Path;
use serde::{Deserialize, Serialize};
pub const META_FILENAME: &str = "index.meta";
const META_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexConfig {
pub kmer_size: usize,
pub minimizer_size: usize,
pub n_bits: usize,
pub with_counts: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexMeta {
pub version: u32,
pub config: IndexConfig,
/// Ordered list of genome labels indexed here.
/// Element 0 is the initial genome; subsequent entries come from merges.
pub genomes: Vec<String>,
}
impl IndexMeta {
pub fn new(config: IndexConfig) -> Self {
Self { version: META_VERSION, config, genomes: Vec::new() }
}
pub fn write(&self, root: &Path) -> io::Result<()> {
let file = fs::File::create(root.join(META_FILENAME))?;
serde_json::to_writer_pretty(file, self).map_err(io::Error::other)
}
pub fn read(root: &Path) -> io::Result<Self> {
let file = fs::File::open(root.join(META_FILENAME))?;
serde_json::from_reader(file).map_err(io::Error::other)
}
pub fn exists(root: &Path) -> bool {
root.join(META_FILENAME).exists()
}
}
+45
View File
@@ -0,0 +1,45 @@
use std::path::Path;
use crate::meta::META_FILENAME;
pub const SENTINEL_SCATTERED: &str = "scatter.done";
pub const SENTINEL_COUNTED: &str = "kmer_spectrum_raw.json";
pub const SENTINEL_INDEXED: &str = "index.done";
/// Progression state of a `KmerIndex`.
///
/// Variants are ordered: `Empty < Scattered < Counted < Indexed`.
/// A state is reported only when its sentinel file is fully present —
/// partial states (e.g. scatter interrupted mid-way) are not accepted.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum IndexState {
/// `index.meta` present; scatter not yet completed.
Empty,
/// `scatter.done` sentinel present — all super-kmers have been routed.
Scattered,
/// `kmer_spectrum_raw.json` present — dereplicate + count complete.
Counted,
/// `index.done` sentinel present — layered MPHF index fully built.
Indexed,
}
impl IndexState {
/// Detect the state of the index at `root`.
///
/// Returns `None` if `index.meta` is absent (not an obikindex directory).
pub fn detect(root: &Path) -> Option<Self> {
if !root.join(META_FILENAME).exists() {
return None;
}
if root.join(SENTINEL_INDEXED).exists() {
return Some(Self::Indexed);
}
if root.join(SENTINEL_COUNTED).exists() {
return Some(Self::Counted);
}
if root.join(SENTINEL_SCATTERED).exists() {
return Some(Self::Scattered);
}
Some(Self::Empty)
}
}
+4 -5
View File
@@ -13,15 +13,14 @@ obiread = { path = "../obiread" }
obiskbuilder = { path = "../obiskbuilder" } obiskbuilder = { path = "../obiskbuilder" }
obifastwrite = { path = "../obifastwrite" } obifastwrite = { path = "../obifastwrite" }
obipipeline = { path = "../obipipeline" } obipipeline = { path = "../obipipeline" }
obidebruinj = { path = "../obidebruinj" }
clap = { version = "4", features = ["derive"] }
obikrope = { path = "../obikrope" } obikrope = { path = "../obikrope" }
obikpartitionner = { path = "../obikpartitionner" } obikpartitionner = { path = "../obikpartitionner" }
obisys = { path = "../obisys" }
obiskio = { path = "../obiskio" } obiskio = { path = "../obiskio" }
niffler = "3" obikindex = { path = "../obikindex" }
clap = { version = "4", features = ["derive"] }
rayon = "1" rayon = "1"
ph = "0.11" indicatif = "0.17"
memmap2 = "0.9"
tracing = "0.1.44" tracing = "0.1.44"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
pprof = { version = "0.13", features = ["prost-codec"], optional = true } pprof = { version = "0.13", features = ["prost-codec"], optional = true }
-24
View File
@@ -1,24 +0,0 @@
use clap::Args;
use obikpartitionner::KmerPartition;
use std::path::PathBuf;
use tracing::info;
#[derive(Args)]
pub struct CountArgs {
/// Partition directory produced by the `partition` command
#[arg(short, long)]
pub partition: PathBuf,
}
pub fn run(args: CountArgs) {
let kp = KmerPartition::open(&args.partition).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1)
});
info!("counting kmers in {}", args.partition.display());
kp.count_kmer().unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1)
});
}
-84
View File
@@ -1,84 +0,0 @@
use std::fs::File;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use clap::Args;
use niffler::Level;
use niffler::send::compression::Format;
use obifastwrite::write_count;
use obikpartitionner::KmerPartition;
use obiskio::SKFileReader;
use rayon::prelude::*;
use tracing::info;
#[derive(Args)]
pub struct FastaArgs {
/// Root of the k-mer partition directory (produced by the `partition` command)
pub partition: PathBuf,
/// Dump dereplicated super-kmers as FASTA (→ <partition>/dereplicated.skmer.fasta.gz)
#[arg(long)]
pub super_kmers: bool,
}
pub fn run(args: FastaArgs) {
if !args.super_kmers {
eprintln!("error: specify at least one output mode (--super-kmers)");
std::process::exit(1);
}
let kp = KmerPartition::open(&args.partition).unwrap_or_else(|e| {
eprintln!("error opening partition: {e}");
std::process::exit(1)
});
if args.super_kmers {
dump_super_kmers(&kp, &args.partition);
}
}
fn dump_super_kmers(kp: &KmerPartition, partition_dir: &PathBuf) {
let k = kp.kmer_size();
let m = kp.minimizer_size();
let n = kp.n_partitions();
info!("writing {n} partition FASTA files (parallel)");
let total = AtomicUsize::new(0);
(0..n).into_par_iter().for_each(|i| {
let part_dir = partition_dir.join(format!("part_{i:05}"));
let in_path = part_dir.join("dereplicated.skmer.zst");
if !in_path.exists() {
return;
}
let out_path = part_dir.join("dereplicated.skmer.fasta.gz");
let file = File::create(&out_path).unwrap_or_else(|e| {
eprintln!("error creating {}: {e}", out_path.display());
std::process::exit(1)
});
let mut writer = niffler::send::get_writer(Box::new(file), Format::Gzip, Level::Six)
.unwrap_or_else(|e| {
eprintln!("error creating gzip writer: {e}");
std::process::exit(1)
});
let mut reader = SKFileReader::open(&in_path).unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", in_path.display());
std::process::exit(1)
});
let mut count = 0usize;
for sk in reader.iter() {
write_count(&sk, &mut writer, k, m, i as u32).unwrap_or_else(|e| {
eprintln!("write error: {e}");
std::process::exit(1)
});
count += 1;
}
info!("partition {i}: {count} super-kmers → {}", out_path.display());
total.fetch_add(count, Ordering::Relaxed);
});
info!("wrote {} super-kmers total", total.load(Ordering::Relaxed));
}
+117
View File
@@ -0,0 +1,117 @@
use std::path::PathBuf;
use clap::Args;
use obikindex::{IndexConfig, IndexState, KmerIndex};
use obikseq::{set_k, set_m};
use obisys::Reporter;
use tracing::info;
use crate::cli::CommonArgs;
use crate::steps::scatter;
#[derive(Args)]
pub struct IndexArgs {
/// Output index directory
#[arg(short, long)]
pub output: PathBuf,
/// Overwrite output directory if it already exists
#[arg(long, default_value_t = false)]
pub force: bool,
/// Genome label (default: input filename without path/extension)
#[arg(long)]
pub label: Option<String>,
/// Minimum kmer abundance (inclusive)
#[arg(long, default_value_t = 1)]
pub min_abundance: u32,
/// Maximum kmer abundance (inclusive)
#[arg(long)]
pub max_abundance: Option<u32>,
/// Store kmer counts in the index (default: set membership only)
#[arg(long, default_value_t = false)]
pub with_counts: bool,
/// Keep intermediate build files (dereplicated superkmers, mphf1, counts1)
#[arg(long, default_value_t = false)]
pub keep_intermediate: bool,
#[command(flatten)]
pub common: CommonArgs,
}
pub fn run(args: IndexArgs) {
let output = args.output.clone();
let mut rep = Reporter::new();
// ── Open or create the index ─────────────────────────────────────────────
let mut idx = if KmerIndex::exists(&output) {
info!("resuming from existing index at {}", output.display());
KmerIndex::open(&output).unwrap_or_else(|e| {
eprintln!("error opening index: {e}");
std::process::exit(1);
})
} else {
let config = IndexConfig {
kmer_size: args.common.kmer_size,
minimizer_size: args.common.minimizer_size,
n_bits: args.common.partition_bits,
with_counts: args.with_counts,
};
KmerIndex::create(&output, config, args.label.clone(), args.force).unwrap_or_else(|e| {
eprintln!("error creating index: {e}");
std::process::exit(1);
})
};
set_k(idx.kmer_size());
set_m(idx.minimizer_size());
// ── Stage 1: scatter ─────────────────────────────────────────────────────
if idx.state() < IndexState::Scattered {
let first_path = args.common.inputs.first().map(PathBuf::from);
let k = idx.kmer_size();
let level_max = args.common.level_max;
let theta = args.common.theta;
let n_workers = args.common.threads.max(1);
scatter(idx.partition_mut(), args.common.seqfile_paths(), k, level_max, theta, n_workers, &mut rep);
idx.mark_scattered(first_path.as_deref()).unwrap_or_else(|e| {
eprintln!("error marking scatter done: {e}");
std::process::exit(1);
});
} else {
info!("scatter already done, skipping");
}
// ── Stage 2: dereplicate + count ─────────────────────────────────────────
if idx.state() < IndexState::Counted {
idx.dereplicate_and_count(&mut rep).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1);
});
} else {
info!("dereplicate+count already done, skipping");
}
// ── Stage 3: build layered index ─────────────────────────────────────────
if idx.state() < IndexState::Indexed {
idx.build_layers(
args.min_abundance,
args.max_abundance,
args.keep_intermediate,
&mut rep,
).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1);
});
} else {
info!("index already built, skipping");
}
rep.print();
}
-143
View File
@@ -1,143 +0,0 @@
use std::fs::File;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use clap::Args;
use niffler::Level;
use niffler::send::compression::Format;
use obidebruinj::GraphDeBruijn;
use obikpartitionner::KmerPartition;
use obikseq::set_k;
use obiskio::SKFileReader;
use ph::fmph::GOFunction;
use rayon::prelude::*;
use tracing::info;
#[derive(Args)]
pub struct LongtigArgs {
/// Root of the k-mer partition directory (produced by the `partition` command)
pub partition: PathBuf,
/// Minimum kmer abundance (inclusive); kmers below this threshold are excluded
#[arg(long, default_value_t = 1)]
pub min_abundance: u32,
/// Maximum kmer abundance (inclusive); kmers above this threshold are excluded
#[arg(long)]
pub max_abundance: Option<u32>,
}
pub fn run(args: LongtigArgs) {
let kp = KmerPartition::open(&args.partition).unwrap_or_else(|e| {
eprintln!("error opening partition: {e}");
std::process::exit(1)
});
let k = kp.kmer_size();
set_k(k);
let n = kp.n_partitions();
info!("building longtigs from {n} partitions (k={k}, parallel)");
let total_kmers = AtomicUsize::new(0);
(0..n).into_par_iter().for_each(|i| {
let part_dir = args.partition.join(format!("part_{i:05}"));
let in_path = part_dir.join("dereplicated.skmer.zst");
if !in_path.exists() {
return;
}
let out_path = part_dir.join("longtig.fasta.gz");
let mut g = GraphDeBruijn::new();
let mphf_path = part_dir.join("mphf1.bin");
let counts_path = part_dir.join("counts1.bin");
let filter_active = (args.min_abundance > 1 || args.max_abundance.is_some())
&& mphf_path.exists()
&& counts_path.exists();
let mphf_opt: Option<GOFunction> = if filter_active {
let mut f = File::open(&mphf_path).unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", mphf_path.display());
std::process::exit(1)
});
Some(GOFunction::read(&mut f).unwrap_or_else(|e| {
eprintln!("error reading MPHF {}: {e}", mphf_path.display());
std::process::exit(1)
}))
} else {
None
};
let counts_mmap_opt = if filter_active {
let cf = File::open(&counts_path).unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", counts_path.display());
std::process::exit(1)
});
Some(unsafe {
memmap2::Mmap::map(&cf).unwrap_or_else(|e| {
eprintln!("error mmapping {}: {e}", counts_path.display());
std::process::exit(1)
})
})
} else {
None
};
let counts_slice: Option<&[u32]> = counts_mmap_opt
.as_ref()
.map(|m| unsafe { std::slice::from_raw_parts(m.as_ptr() as *const u32, m.len() / 4) });
let mut reader = SKFileReader::open(&in_path).unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", in_path.display());
std::process::exit(1)
});
for sk in reader.iter() {
for kmer in sk.iter_canonical_kmers() {
let accept = match (&mphf_opt, counts_slice) {
(Some(mphf), Some(counts)) => {
if let Some(slot) = mphf.get(&kmer) {
let ab = counts[slot as usize];
ab >= args.min_abundance
&& args.max_abundance.map_or(true, |max| ab <= max)
} else {
false
}
}
_ => true,
};
if accept {
g.push(kmer);
}
}
}
let n_kmers = g.len();
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
info!(
"partition {i}/{n}: {n_kmers} canonical k-mers → {}",
out_path.display()
);
g.compute_degrees();
let file = File::create(&out_path).unwrap_or_else(|e| {
eprintln!("error creating {}: {e}", out_path.display());
std::process::exit(1)
});
let mut writer = niffler::send::get_writer(Box::new(file), Format::Gzip, Level::Six)
.unwrap_or_else(|e| {
eprintln!("error creating gzip writer: {e}");
std::process::exit(1)
});
g.write_fasta(&mut writer, false).unwrap_or_else(|e| {
eprintln!("write error on partition {i}: {e}");
std::process::exit(1)
});
});
info!(
"done — {} total canonical k-mers across all partitions",
total_kmers.load(Ordering::Relaxed)
);
}
+1 -4
View File
@@ -1,6 +1,3 @@
pub mod count; pub mod index;
pub mod fasta;
pub mod longtig;
pub mod partition;
pub mod superkmer; pub mod superkmer;
pub mod unitig; pub mod unitig;
-61
View File
@@ -1,61 +0,0 @@
use std::path::PathBuf;
use clap::Args;
use obikpartitionner::KmerPartition;
use obikseq::{RoutableSuperKmer, set_k, set_m};
use tracing::info;
use crate::cli::{CommonArgs, PipelineData, open_chunks};
#[derive(Args)]
pub struct PartitionArgs {
/// Output partition directory
#[arg(short, long)]
pub output: PathBuf,
/// Overwrite output directory if it already exists
#[arg(long, default_value_t = false)]
pub force: bool,
#[command(flatten)]
pub common: CommonArgs,
}
// ── Entry point ───────────────────────────────────────────────────────────────
pub fn run(args: PartitionArgs) {
let k = args.common.kmer_size;
set_k(k);
let m = args.common.minimizer_size;
set_m(m);
let theta = args.common.theta;
let level_max = args.common.level_max;
let n_workers = args.common.threads.max(1);
let mut kp = KmerPartition::create(&args.output, args.common.partition_bits, k, m, args.force)
.unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1)
});
let path_source = args.common.seqfile_paths();
let pipe = obipipeline::make_pipe! {
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
||? { |path| open_chunks(path) } : Path => RawChunk,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
};
for batch in pipe.apply(path_source, n_workers, 1) {
kp.write_batch(batch).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1)
});
}
kp.close().expect("close error");
info!("dereplicating...");
kp.dereplicate().expect("dereplicate error");
kp.count_kmer().expect("count kmer error");
}
+4 -1
View File
@@ -3,7 +3,7 @@ use std::path::PathBuf;
use clap::Args; use clap::Args;
use obifastwrite::write_scatter; use obifastwrite::write_scatter;
use obikseq::RoutableSuperKmer; use obikseq::{RoutableSuperKmer, set_k, set_m};
use crate::cli::{CommonArgs, PipelineData, open_chunks}; use crate::cli::{CommonArgs, PipelineData, open_chunks};
@@ -41,6 +41,9 @@ pub fn run(args: SuperkmerArgs) {
let partition_bits = args.common.partition_bits; let partition_bits = args.common.partition_bits;
let n_workers = args.common.threads.max(1); let n_workers = args.common.threads.max(1);
set_k(k);
set_m(m);
let path_source = args.common.seqfile_paths(); let path_source = args.common.seqfile_paths();
let pipe = obipipeline::make_pipe! { let pipe = obipipeline::make_pipe! {
+23 -112
View File
@@ -1,143 +1,54 @@
use std::fs::File; use std::io::{self, BufWriter, Write};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex;
use clap::Args; use clap::Args;
use niffler::Level; use obifastwrite::write_unitig;
use niffler::send::compression::Format; use obikindex::KmerIndex;
use obidebruinj::GraphDeBruijn;
use obikpartitionner::KmerPartition;
use obikseq::set_k; use obikseq::set_k;
use obiskio::SKFileReader; use obiskio::UnitigFileReader;
use ph::fmph::GOFunction;
use rayon::prelude::*; use rayon::prelude::*;
use tracing::info; use tracing::info;
#[derive(Args)] #[derive(Args)]
pub struct UnitigArgs { pub struct UnitigArgs {
/// Root of the k-mer partition directory (produced by the `partition` command) /// Index directory produced by the `index` command
pub partition: PathBuf, pub index: PathBuf,
/// Minimum kmer abundance (inclusive); kmers below this threshold are excluded
#[arg(long, default_value_t = 1)]
pub min_abundance: u32,
/// Maximum kmer abundance (inclusive); kmers above this threshold are excluded
#[arg(long)]
pub max_abundance: Option<u32>,
} }
pub fn run(args: UnitigArgs) { pub fn run(args: UnitigArgs) {
let kp = KmerPartition::open(&args.partition).unwrap_or_else(|e| { let idx = KmerIndex::open(&args.index).unwrap_or_else(|e| {
eprintln!("error opening partition: {e}"); eprintln!("error opening index: {e}");
std::process::exit(1) std::process::exit(1)
}); });
let k = kp.kmer_size(); let k = idx.kmer_size();
set_k(k); set_k(k);
let n = kp.n_partitions(); let n = idx.n_partitions();
info!("building unitigs from {n} partitions (k={k}, parallel)"); info!("dumping unitigs from {n} partitions (k={k})");
let total_kmers = AtomicUsize::new(0); let stdout = Mutex::new(BufWriter::new(io::stdout()));
(0..n).into_par_iter().for_each(|i| { (0..n).into_par_iter().for_each(|i| {
let part_dir = args.partition.join(format!("part_{i:05}")); let path = idx.layer_unitigs_path(i, 0);
let in_path = part_dir.join("dereplicated.skmer.zst"); if !path.exists() {
if !in_path.exists() {
return; return;
} }
let out_path = part_dir.join("unitig.fasta.gz");
let mut g = GraphDeBruijn::new(); let reader = UnitigFileReader::open(&path).unwrap_or_else(|e| {
eprintln!("error opening unitigs (partition {i}): {e}");
let mphf_path = part_dir.join("mphf1.bin");
let counts_path = part_dir.join("counts1.bin");
let filter_active = (args.min_abundance > 1 || args.max_abundance.is_some())
&& mphf_path.exists()
&& counts_path.exists();
let mphf_opt: Option<GOFunction> = if filter_active {
let mut f = File::open(&mphf_path).unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", mphf_path.display());
std::process::exit(1) std::process::exit(1)
}); });
Some(GOFunction::read(&mut f).unwrap_or_else(|e| {
eprintln!("error reading MPHF {}: {e}", mphf_path.display());
std::process::exit(1)
}))
} else {
None
};
let counts_mmap_opt = if filter_active { for j in 0..reader.len() {
let cf = File::open(&counts_path).unwrap_or_else(|e| { let unitig = reader.unitig(j);
eprintln!("error opening {}: {e}", counts_path.display()); let mut out = stdout.lock().unwrap();
write_unitig(&unitig, k, i, j, &mut *out).unwrap_or_else(|e| {
eprintln!("write error: {e}");
std::process::exit(1) std::process::exit(1)
}); });
Some(unsafe {
memmap2::Mmap::map(&cf).unwrap_or_else(|e| {
eprintln!("error mmapping {}: {e}", counts_path.display());
std::process::exit(1)
})
})
} else {
None
};
let counts_slice: Option<&[u32]> = counts_mmap_opt
.as_ref()
.map(|m| unsafe { std::slice::from_raw_parts(m.as_ptr() as *const u32, m.len() / 4) });
let mut reader = SKFileReader::open(&in_path).unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", in_path.display());
std::process::exit(1)
});
for sk in reader.iter() {
for kmer in sk.iter_canonical_kmers() {
let accept = match (&mphf_opt, counts_slice) {
(Some(mphf), Some(counts)) => {
if let Some(slot) = mphf.get(&kmer) {
let ab = counts[slot as usize];
ab >= args.min_abundance
&& args.max_abundance.map_or(true, |max| ab <= max)
} else {
false
} }
}
_ => true,
};
if accept {
g.push(kmer);
}
}
}
let n_kmers = g.len();
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
info!(
"partition {i}/{n}: {n_kmers} canonical k-mers → {}",
out_path.display()
);
g.compute_degrees();
let file = File::create(&out_path).unwrap_or_else(|e| {
eprintln!("error creating {}: {e}", out_path.display());
std::process::exit(1)
});
let mut writer = niffler::send::get_writer(Box::new(file), Format::Gzip, Level::Six)
.unwrap_or_else(|e| {
eprintln!("error creating gzip writer: {e}");
std::process::exit(1)
});
g.write_fasta(&mut writer, true).unwrap_or_else(|e| {
eprintln!("write error on partition {i}: {e}");
std::process::exit(1)
});
}); });
info!( stdout.into_inner().unwrap().flush().expect("flush error");
"done — {} total canonical k-mers across all partitions",
total_kmers.load(Ordering::Relaxed)
);
} }
+6 -14
View File
@@ -1,5 +1,6 @@
mod cli; mod cli;
mod cmd; mod cmd;
mod steps;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use tracing_subscriber::{EnvFilter, fmt}; use tracing_subscriber::{EnvFilter, fmt};
@@ -13,18 +14,12 @@ struct Cli {
#[derive(Subcommand)] #[derive(Subcommand)]
enum Commands { enum Commands {
/// Extract super-kmers from a sequence file (scatter phase) /// Extract super-kmers from a sequence file and write to stdout
Superkmer(cmd::superkmer::SuperkmerArgs), Superkmer(cmd::superkmer::SuperkmerArgs),
/// Partition super-kmers on disk by minimizer /// Build the complete genome index (scatter → dereplicate → count → layered MPHF)
Partition(cmd::partition::PartitionArgs), Index(cmd::index::IndexArgs),
/// Count kmers from an existing dereplicated partition directory /// Dump unitigs from a built index to stdout (debug)
Count(cmd::count::CountArgs),
/// Export partition data to FASTA (--super-kmers: dereplicated super-kmers)
Fasta(cmd::fasta::FastaArgs),
/// Build de Bruijn unitigs for all partitions and write to unitig.fasta.gz
Unitig(cmd::unitig::UnitigArgs), Unitig(cmd::unitig::UnitigArgs),
/// Build de Bruijn longtigs for all partitions and write to longtig.fasta.gz
Longtig(cmd::longtig::LongtigArgs),
} }
fn main() { fn main() {
@@ -47,11 +42,8 @@ fn main() {
let cli = Cli::parse(); let cli = Cli::parse();
match cli.command { match cli.command {
Commands::Superkmer(args) => cmd::superkmer::run(args), Commands::Superkmer(args) => cmd::superkmer::run(args),
Commands::Partition(args) => cmd::partition::run(args), Commands::Index(args) => cmd::index::run(args),
Commands::Count(args) => cmd::count::run(args),
Commands::Fasta(args) => cmd::fasta::run(args),
Commands::Unitig(args) => cmd::unitig::run(args), Commands::Unitig(args) => cmd::unitig::run(args),
Commands::Longtig(args) => cmd::longtig::run(args),
} }
#[cfg(feature = "profiling")] #[cfg(feature = "profiling")]
+3
View File
@@ -0,0 +1,3 @@
mod scatter;
pub use scatter::scatter;
+71
View File
@@ -0,0 +1,71 @@
use std::path::PathBuf;
use std::time::{Duration, Instant};
use indicatif::{ProgressBar, ProgressStyle};
use obikpartitionner::KmerPartition;
use obisys::{Reporter, Stage};
use crate::cli::{PipelineData, open_chunks};
/// Run scatter: normalise → build superkmers → route to partition → close.
/// Reports the "scatter" stage to `rep`.
pub fn scatter(
kp: &mut KmerPartition,
path_source: obiread::PathIter,
k: usize,
level_max: usize,
theta: f64,
n_workers: usize,
rep: &mut Reporter,
) {
use obikseq::RoutableSuperKmer;
let t = Stage::start("scatter");
let pipe = obipipeline::make_pipe! {
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
||? { |path| open_chunks(path) } : Path => RawChunk,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
};
let pb = ProgressBar::new_spinner();
pb.set_style(
ProgressStyle::with_template("{spinner} scatter — {msg} {elapsed}")
.unwrap()
.tick_strings(&["", "", "", "", "", "", "", "", "", ""]),
);
pb.enable_steady_tick(Duration::from_millis(100));
let mut total_bases: u64 = 0;
let mut ema_rate: f64 = 0.0;
let mut last_t = Instant::now();
let mut last_bases: u64 = 0;
const ALPHA: f64 = 0.15;
for batch in pipe.apply(path_source, n_workers, 1) {
total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::<u64>();
let now = Instant::now();
let dt = now.duration_since(last_t).as_secs_f64();
if dt > 0.1 {
let instant = (total_bases - last_bases) as f64 / dt;
ema_rate = ALPHA * instant + (1.0 - ALPHA) * ema_rate;
last_t = now;
last_bases = total_bases;
let bp = total_bases as f64;
let (count_str, rate_str) = if bp >= 1e9 {
(format!("{:.2} Gbp", bp / 1e9), format!("{:.0} Mbp/s", ema_rate / 1e6))
} else {
(format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6))
};
pb.set_message(format!("{count_str} {rate_str}"));
}
kp.write_batch(batch).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1);
});
}
pb.finish_and_clear();
kp.close().expect("close error");
rep.push(t.stop());
}
+1
View File
@@ -25,3 +25,4 @@ epserde = "0.8"
memmap2 = "0.9.10" memmap2 = "0.9.10"
obicompactvec = { path = "../obicompactvec" } obicompactvec = { path = "../obicompactvec" }
ptr_hash = "1.1" ptr_hash = "1.1"
indicatif = "0.17"
+1 -1
View File
@@ -1,4 +1,4 @@
mod kmer_sort; mod kmer_sort;
mod partition; mod partition;
pub use partition::KmerPartition; pub use partition::{KmerPartition, PARTITIONS_SUBDIR};
+40 -17
View File
@@ -2,7 +2,10 @@ use std::collections::{BTreeMap, HashMap};
use std::fs; use std::fs;
use std::io; use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tracing::{debug, info}; use std::time::Instant;
use tracing::debug;
use indicatif::{ProgressBar, ProgressStyle};
use cacheline_ef::{CachelineEf, CachelineEfVec}; use cacheline_ef::{CachelineEf, CachelineEfVec};
use epserde::ser::Serialize as EpSerialize; use epserde::ser::Serialize as EpSerialize;
@@ -27,6 +30,7 @@ type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u
const META_FILENAME: &str = "partition.meta"; const META_FILENAME: &str = "partition.meta";
const SK_EXT: &str = "skmer.zst"; const SK_EXT: &str = "skmer.zst";
pub const PARTITIONS_SUBDIR: &str = "partitions";
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct PartitionMeta { struct PartitionMeta {
@@ -55,7 +59,7 @@ impl KmerPartition {
minimizer_size: usize, minimizer_size: usize,
force: bool, force: bool,
) -> SKResult<Self> { ) -> SKResult<Self> {
Self::create_with(path, n_bits, kmer_size, minimizer_size, Level::Three, force) Self::create_with(path, n_bits, kmer_size, minimizer_size, Level::One, force)
} }
pub fn create_with<P: AsRef<Path>>( pub fn create_with<P: AsRef<Path>>(
@@ -81,7 +85,7 @@ impl KmerPartition {
.into()); .into());
} }
} }
fs::create_dir_all(&root_path)?; fs::create_dir_all(root_path.join(PARTITIONS_SUBDIR))?;
let n_partitions = 1usize << n_bits; let n_partitions = 1usize << n_bits;
let writers = (0..n_partitions).map(|_| None).collect(); let writers = (0..n_partitions).map(|_| None).collect();
let partition = Self { let partition = Self {
@@ -172,6 +176,11 @@ impl KmerPartition {
&self.root_path &self.root_path
} }
/// Path of partition `i` directory.
pub fn part_dir(&self, i: usize) -> PathBuf {
self.root_path.join(PARTITIONS_SUBDIR).join(format!("part_{i:05}"))
}
pub fn kmer_size(&self) -> usize { pub fn kmer_size(&self) -> usize {
self.kmer_size self.kmer_size
} }
@@ -205,7 +214,6 @@ impl KmerPartition {
/// more temporary file descriptors — all managed by the global fd pool. /// more temporary file descriptors — all managed by the global fd pool.
pub fn dereplicate(&self) -> SKResult<()> { pub fn dereplicate(&self) -> SKResult<()> {
let level = self.level; let level = self.level;
let root = &self.root_path;
let sys = System::new_all(); let sys = System::new_all();
// available_memory() can return 0 on macOS when the compressor page count exceeds // available_memory() can return 0 on macOS when the compressor page count exceeds
// free+inactive+purgeable pages (sysinfo saturating_sub). Fall back to half of total. // free+inactive+purgeable pages (sysinfo saturating_sub). Fall back to half of total.
@@ -219,7 +227,7 @@ impl KmerPartition {
let results: Vec<SKResult<()>> = (0..self.n_partitions) let results: Vec<SKResult<()>> = (0..self.n_partitions)
.into_par_iter() .into_par_iter()
.map(|i| { .map(|i| {
let dir = root.join(format!("part_{:05}", i)); let dir = self.part_dir(i);
if !dir.exists() { if !dir.exists() {
return Ok(()); return Ok(());
} }
@@ -246,8 +254,6 @@ impl KmerPartition {
/// Partitions are processed in parallel via Rayon (one task per thread). /// Partitions are processed in parallel via Rayon (one task per thread).
/// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously. /// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously.
pub fn count_kmer(&self) -> SKResult<()> { pub fn count_kmer(&self) -> SKResult<()> {
let root = &self.root_path;
let sys = System::new_all(); let sys = System::new_all();
let available = match sys.available_memory() { let available = match sys.available_memory() {
0 => sys.total_memory() / 2, 0 => sys.total_memory() / 2,
@@ -256,18 +262,33 @@ impl KmerPartition {
let n_threads = rayon::current_num_threads().max(1) as u64; let n_threads = rayon::current_num_threads().max(1) as u64;
let chunk_kmers = chunk_size_from_ram(available / n_threads); let chunk_kmers = chunk_size_from_ram(available / n_threads);
let pb = ProgressBar::new(self.n_partitions as u64);
pb.set_style(
ProgressStyle::with_template(
"counting [{bar:40}] {pos}/{len} ({percent}%) {per_sec} eta {eta} {msg}",
)
.unwrap()
.progress_chars("█▌░"),
);
let results: Vec<SKResult<()>> = (0..self.n_partitions) let results: Vec<SKResult<()>> = (0..self.n_partitions)
.into_par_iter() .into_par_iter()
.map(|i| { .map(|i| {
let dir = root.join(format!("part_{:05}", i)); let dir = self.part_dir(i);
let dedup_path = dir.join(format!("dereplicated.{SK_EXT}")); let dedup_path = dir.join(format!("dereplicated.{SK_EXT}"));
if !dedup_path.exists() { if !dedup_path.exists() {
pb.inc(1);
return Ok(()); return Ok(());
} }
info!("counting kmers in partition {}/{}", i, self.n_partitions); let t = Instant::now();
count_partition(&dir, &dedup_path, chunk_kmers) let result = count_partition(&dir, &dedup_path, chunk_kmers);
pb.set_message(format!("last {:.0}ms", t.elapsed().as_millis()));
pb.inc(1);
result
}) })
.collect(); .collect();
pb.finish_and_clear();
for r in results { for r in results {
r?; r?;
} }
@@ -278,9 +299,7 @@ impl KmerPartition {
let mut global_f1: u64 = 0; let mut global_f1: u64 = 0;
for i in 0..self.n_partitions { for i in 0..self.n_partitions {
let path = root let path = self.part_dir(i).join("kmer_spectrum_raw.json");
.join(format!("part_{:05}", i))
.join("kmer_spectrum_raw.json");
if !path.exists() { if !path.exists() {
continue; continue;
} }
@@ -302,7 +321,7 @@ impl KmerPartition {
.map(|(&c, &f)| (format!("{c:010}"), f)) .map(|(&c, &f)| (format!("{c:010}"), f))
.collect(); .collect();
serde_json::to_writer_pretty( serde_json::to_writer_pretty(
fs::File::create(root.join("kmer_spectrum_raw.json"))?, fs::File::create(self.root_path.join("kmer_spectrum_raw.json"))?,
&serde_json::json!({ "f0": global_f0, "f1": global_f1, "spectrum": &global_spectrum_map }), &serde_json::json!({ "f0": global_f0, "f1": global_f1, "spectrum": &global_spectrum_map }),
) )
.map_err(io::Error::other)?; .map_err(io::Error::other)?;
@@ -334,7 +353,7 @@ impl KmerPartition {
fn ensure_writer(&mut self, partition: usize) -> SKResult<&mut SKFileWriter> { fn ensure_writer(&mut self, partition: usize) -> SKResult<&mut SKFileWriter> {
if self.writers[partition].is_none() { if self.writers[partition].is_none() {
let dir = self.root_path.join(format!("part_{:05}", partition)); let dir = self.root_path.join(PARTITIONS_SUBDIR).join(format!("part_{:05}", partition));
fs::create_dir_all(&dir)?; fs::create_dir_all(&dir)?;
let file_path = dir.join(format!("raw.{SK_EXT}")); let file_path = dir.join(format!("raw.{SK_EXT}"));
let writer = SKFileWriter::create_with(file_path, Format::Zstd, self.level)?; let writer = SKFileWriter::create_with(file_path, Format::Zstd, self.level)?;
@@ -509,7 +528,11 @@ fn build_mphf(unique_path: &Path, f0: usize) -> io::Result<Mphf> {
let kmers: &[u64] = unsafe { let kmers: &[u64] = unsafe {
std::slice::from_raw_parts(mmap.as_ptr() as *const u64, f0) std::slice::from_raw_parts(mmap.as_ptr() as *const u64, f0)
}; };
Ok(Mphf::new_from_par_iter(f0, kmers.par_iter().copied(), PtrHashParams::<CubicEps>::default())) // Sequential constructor: the outer par_iter over partitions already saturates
// the Rayon pool. new_from_par_iter would get no additional threads and adds
// coordination overhead. try_new accesses the same mmap'd pages at zero extra cost.
Mphf::try_new(kmers, PtrHashParams::<CubicEps>::default())
.ok_or_else(|| io::Error::other("ptr_hash construction failed"))
} }
fn count_partition(dir: &Path, dedup_path: &Path, chunk_kmers: usize) -> SKResult<()> { fn count_partition(dir: &Path, dedup_path: &Path, chunk_kmers: usize) -> SKResult<()> {
@@ -623,7 +646,7 @@ mod tests {
kp.close().unwrap(); kp.close().unwrap();
kp.dereplicate().unwrap(); kp.dereplicate().unwrap();
let part_dir = dir.path().join("part_00000"); let part_dir = dir.path().join(PARTITIONS_SUBDIR).join("part_00000");
let dedup_path = part_dir.join("dereplicated.skmer.zst"); let dedup_path = part_dir.join("dereplicated.skmer.zst");
if !dedup_path.exists() { if !dedup_path.exists() {
return (0, 0); return (0, 0);
+1 -1
View File
@@ -118,7 +118,7 @@ pub struct KmerOf<L: KmerLength>(u64, PhantomData<L>);
impl<L: KmerLength> KmerOf<L> { impl<L: KmerLength> KmerOf<L> {
/// Wrap a raw left-aligned u64 value as a kmer. /// Wrap a raw left-aligned u64 value as a kmer.
#[inline] #[inline]
pub fn from_raw(raw: u64) -> Self { pub const fn from_raw(raw: u64) -> Self {
KmerOf(raw, PhantomData) KmerOf(raw, PhantomData)
} }
+145
View File
@@ -0,0 +1,145 @@
use std::fs;
use std::path::PathBuf;
const K_MAX: usize = 32;
const WS_MAX: usize = 6;
fn normalize_circular(kmer: u64, ws: usize) -> u64 {
let mask = (1u64 << (ws * 2)) - 1;
let mut canonical = kmer & mask;
let mut current = canonical;
for _ in 0..ws - 1 {
let top = (current >> ((ws - 1) * 2)) & 3;
current = ((current << 2) | top) & mask;
if current < canonical {
canonical = current;
}
}
canonical
}
fn revcomp_raw(x: u64, k: usize) -> u64 {
let x = !x;
let x = x.swap_bytes();
let x = ((x >> 4) & 0x0F0F0F0F0F0F0F0F) | ((x & 0x0F0F0F0F0F0F0F0F) << 4);
let x = ((x >> 2) & 0x3333333333333333) | ((x & 0x3333333333333333) << 2);
x << (64 - 2 * k)
}
fn build_normalized_kmer(k: usize) -> Vec<u64> {
let n = 1usize << (k * 2);
let shift = 64 - k * 2;
let mut result = vec![0u64; n];
for i in 0..n {
let la = (i as u64) << shift;
let ra = i as u64;
let rc_ra = revcomp_raw(la, k) >> shift;
let circ = normalize_circular(ra, k);
let circ_rc = normalize_circular(rc_ra, k);
result[i] = if circ < circ_rc { circ } else { circ_rc };
}
result
}
fn build_ln_class(norm: &[u64]) -> Vec<f64> {
let n = norm.len();
let mut sizes = vec![0u32; n];
for &c in norm {
sizes[c as usize] += 1;
}
norm.iter()
.map(|&c| {
let s = sizes[c as usize];
if s > 0 { (s as f64).ln() } else { 0.0 }
})
.collect()
}
fn build_n_log_n() -> [f64; K_MAX + 1] {
let mut t = [0.0f64; K_MAX + 1];
for n in 1..=K_MAX {
t[n] = (n as f64) * (n as f64).ln();
}
t
}
fn build_emax() -> [[f64; WS_MAX + 1]; K_MAX + 1] {
let mut t = [[0.0f64; WS_MAX + 1]; K_MAX + 1];
for k in 2..=K_MAX {
for ws in 1..=WS_MAX.min(k - 1) {
let n_raw = 1usize << (ws * 2);
let nwords = k - ws + 1;
let c = nwords / n_raw;
let r = nwords % n_raw;
let nf = nwords as f64;
let t1 = if c == 0 || n_raw == r {
0.0
} else {
let f1 = c as f64 / nf;
(n_raw - r) as f64 * f1 * f1.ln()
};
let t2 = if r == 0 {
0.0
} else {
let f2 = (c + 1) as f64 / nf;
r as f64 * f2 * f2.ln()
};
t[k][ws] = -(t1 + t2);
}
}
t
}
fn build_log_nwords() -> [[f64; WS_MAX + 1]; K_MAX + 1] {
let mut t = [[0.0f64; WS_MAX + 1]; K_MAX + 1];
for k in 2..=K_MAX {
for ws in 1..=WS_MAX.min(k - 1) {
t[k][ws] = ((k - ws + 1) as f64).ln();
}
}
t
}
fn emit_f64_1d(out: &mut String, name: &str, n: usize, values: &[f64]) {
out.push_str(&format!("pub(crate) const {name}: [f64; {n}] = [\n"));
for v in values {
out.push_str(&format!(" {v:?},\n"));
}
out.push_str("];\n");
}
fn emit_f64_2d(out: &mut String, name: &str, rows: usize, cols: usize, values: &[[f64; WS_MAX + 1]]) {
out.push_str(&format!("pub(crate) const {name}: [[f64; {cols}]; {rows}] = [\n"));
for row in values {
out.push_str(" [");
for (i, v) in row.iter().enumerate() {
if i > 0 { out.push_str(", "); }
out.push_str(&format!("{v:?}"));
}
out.push_str("],\n");
}
out.push_str("];\n");
}
fn main() {
let out_dir = PathBuf::from(std::env::var("OUT_DIR").unwrap());
let mut out = String::new();
for k in 1..=6usize {
let n = 1usize << (k * 2);
let norm = build_normalized_kmer(k);
let ln_class = build_ln_class(&norm);
emit_f64_1d(&mut out, &format!("LN_CLASS{k}"), n, &ln_class);
}
let n_log_n = build_n_log_n();
emit_f64_1d(&mut out, "N_LOG_N", K_MAX + 1, &n_log_n);
let emax = build_emax();
emit_f64_2d(&mut out, "EMAX", K_MAX + 1, WS_MAX + 1, &emax);
let log_nwords = build_log_nwords();
emit_f64_2d(&mut out, "LOG_NWORDS", K_MAX + 1, WS_MAX + 1, &log_nwords);
fs::write(out_dir.join("ln_class_tables.rs"), out).unwrap();
}
+63 -128
View File
@@ -1,173 +1,108 @@
use obikseq::kmer::Kmer; pub(crate) const NORMK1: [u64; 4] = build_normalized_kmer::<4>();
use std::sync::LazyLock; pub(crate) const NORMK2: [u64; 16] = build_normalized_kmer::<16>();
pub(crate) const NORMK3: [u64; 64] = build_normalized_kmer::<64>();
pub(crate) const NORMK4: [u64; 256] = build_normalized_kmer::<256>();
pub(crate) const NORMK5: [u64; 1024] = build_normalized_kmer::<1024>();
pub(crate) const NORMK6: [u64; 4096] = build_normalized_kmer::<4096>();
pub(crate) static NORMK1: LazyLock<[u64; 4]> = LazyLock::new(|| build_normalized_kmer::<4>()); include!(concat!(env!("OUT_DIR"), "/ln_class_tables.rs"));
pub(crate) static NORMK2: LazyLock<[u64; 16]> = LazyLock::new(|| build_normalized_kmer::<16>());
pub(crate) static NORMK3: LazyLock<[u64; 64]> = LazyLock::new(|| build_normalized_kmer::<64>());
pub(crate) static NORMK4: LazyLock<[u64; 256]> = LazyLock::new(|| build_normalized_kmer::<256>());
pub(crate) static NORMK5: LazyLock<[u64; 1024]> = LazyLock::new(|| build_normalized_kmer::<1024>());
pub(crate) static NORMK6: LazyLock<[u64; 4096]> = LazyLock::new(|| build_normalized_kmer::<4096>());
pub(crate) static LN_CARD_ROT1: LazyLock<[f64; 4]> = const fn normalize_circular(kmer: u64, ws: usize) -> u64 {
LazyLock::new(|| build_log_class_size::<4>(&NORMK1));
pub(crate) static LN_CARD_ROT2: LazyLock<[f64; 16]> =
LazyLock::new(|| build_log_class_size::<16>(&NORMK2));
pub(crate) static LN_CARD_ROT3: LazyLock<[f64; 64]> =
LazyLock::new(|| build_log_class_size::<64>(&NORMK3));
pub(crate) static LN_CARD_ROT4: LazyLock<[f64; 256]> =
LazyLock::new(|| build_log_class_size::<256>(&NORMK4));
pub(crate) static LN_CARD_ROT5: LazyLock<[f64; 1024]> =
LazyLock::new(|| build_log_class_size::<1024>(&NORMK5));
pub(crate) static LN_CARD_ROT6: LazyLock<[f64; 4096]> =
LazyLock::new(|| build_log_class_size::<4096>(&NORMK6));
fn ln0(x: f64) -> f64 {
if x == 0.0 { 0.0 } else { x.ln() }
}
fn normalize_circular(kmer: u64, ws: usize) -> u64 {
let mask = (1u64 << (ws * 2)) - 1; let mask = (1u64 << (ws * 2)) - 1;
let mut canonical = kmer & mask; let mut canonical = kmer & mask;
let mut current = canonical; let mut current = canonical;
for _ in 0..ws - 1 { let mut i = 0;
while i < (ws - 1) {
let top = (current >> ((ws - 1) * 2)) & 3; let top = (current >> ((ws - 1) * 2)) & 3;
current = ((current << 2) | top) & mask; current = ((current << 2) | top) & mask;
if current < canonical { if current < canonical {
canonical = current; canonical = current;
} }
i += 1;
} }
canonical canonical
} }
fn build_normalized_kmer<const N: usize>() -> [u64; N] { const fn build_normalized_kmer<const N: usize>() -> [u64; N] {
let mut result = [0u64; N]; let mut result = [0u64; N];
let k = N.ilog(4) as usize; let k = k_from_n::<N>();
let shift = 64 - k * 2; let shift = 64 - k * 2;
for i in 0..N { let mut i = 0;
while i < N {
let la = (i as u64) << shift; let la = (i as u64) << shift;
let ra = i as u64; let ra = i as u64;
let rc_ra = Kmer::from_raw(la).revcomp().raw() >> shift; let rc_ra = revcomp_raw(la, k) >> shift;
let circ = normalize_circular(ra, k); let circ = normalize_circular(ra, k);
let circ_rc = normalize_circular(rc_ra, k); let circ_rc = normalize_circular(rc_ra, k);
result[i] = circ.min(circ_rc); result[i] = if circ < circ_rc { circ } else { circ_rc };
i += 1;
} }
result result
} }
fn build_log_class_size<const N: usize>(norm: &[u64; N]) -> [f64; N] { const fn revcomp_raw(x: u64, k: usize) -> u64 {
let mut sizes = [0u32; N]; let x = !x;
for &c in norm { let x = x.swap_bytes();
sizes[c as usize] += 1; let x = ((x >> 4) & 0x0F0F0F0F0F0F0F0F) | ((x & 0x0F0F0F0F0F0F0F0F) << 4);
} let x = ((x >> 2) & 0x3333333333333333) | ((x & 0x3333333333333333) << 2);
let mut result = [0.0f64; N]; x << (64 - 2 * k)
for i in 0..N {
if sizes[i] > 0 {
result[i] = ln0(sizes[i] as f64);
}
}
result
} }
pub(crate) fn entropy_norm_kmer(kmer: u64, k: usize, left: bool) -> u64 { const fn k_from_n<const N: usize>() -> usize {
let shift = 64 - k * 2; match N {
let ra = if left { kmer >> shift } else { kmer }; // left-aligned → right-aligned index 4 => 1,
let canonical_ra = match k { 16 => 2,
1 => NORMK1[ra as usize], 64 => 3,
2 => NORMK2[ra as usize], 256 => 4,
3 => NORMK3[ra as usize], 1024 => 5,
4 => NORMK4[ra as usize], 4096 => 6,
5 => NORMK5[ra as usize], _ => panic!("N must be a power of 4"),
6 => NORMK6[ra as usize],
_ => panic!("k must be 1..=6"),
};
if left {
canonical_ra << shift
} else {
canonical_ra
} // right-aligned → left-aligned
}
pub(crate) fn ln_class_size(kmer: u64, k: usize, left: bool) -> f64 {
let ra = if left { kmer >> (64 - k * 2) } else { kmer }; // left-aligned → right-aligned index
match k {
1 => LN_CARD_ROT1[NORMK1[ra as usize] as usize],
2 => LN_CARD_ROT2[NORMK2[ra as usize] as usize],
3 => LN_CARD_ROT3[NORMK3[ra as usize] as usize],
4 => LN_CARD_ROT4[NORMK4[ra as usize] as usize],
5 => LN_CARD_ROT5[NORMK5[ra as usize] as usize],
6 => LN_CARD_ROT6[NORMK6[ra as usize] as usize],
_ => panic!("k must be 1..=6"),
} }
} }
// ── k-dependent tables (k ≤ K_MAX, ws ≤ WS_MAX) ──────────────────────────────
pub(crate) const K_MAX: usize = 32;
pub(crate) const WS_MAX: usize = 6; pub(crate) const WS_MAX: usize = 6;
/// n·ln(n), with n_log_n[0] = 0. Indexed by n = 0..=K_MAX. #[inline(always)]
pub(crate) static N_LOG_N: LazyLock<[f64; K_MAX + 1]> = LazyLock::new(|| build_n_log_n()); pub(crate) const fn n_log_n(n: usize) -> f64 {
/// H_max[k][ws]: maximum entropy for kmer length k and word size ws.
pub(crate) static EMAX: LazyLock<[[f64; WS_MAX + 1]; K_MAX + 1]> = LazyLock::new(|| build_emax());
/// ln(k ws + 1): log of the number of ws-words in a kmer of length k.
pub(crate) static LOG_NWORDS: LazyLock<[[f64; WS_MAX + 1]; K_MAX + 1]> =
LazyLock::new(|| build_log_nwords());
pub(crate) fn n_log_n(n: usize) -> f64 {
N_LOG_N[n] N_LOG_N[n]
} }
pub(crate) fn emax(k: usize, ws: usize) -> f64 { #[inline(always)]
pub(crate) const fn emax(k: usize, ws: usize) -> f64 {
EMAX[k][ws] EMAX[k][ws]
} }
pub(crate) fn log_nwords(k: usize, ws: usize) -> f64 { #[inline(always)]
pub(crate) const fn log_nwords(k: usize, ws: usize) -> f64 {
LOG_NWORDS[k][ws] LOG_NWORDS[k][ws]
} }
fn build_n_log_n() -> [f64; K_MAX + 1] { #[inline(always)]
let mut result = [0.0f64; K_MAX + 1]; pub(crate) const fn entropy_norm_kmer<const LEFT: bool, const K: usize>(kmer: u64) -> u64 {
for n in 1..=K_MAX { const SHIFT: [usize; 7] = [0, 62, 60, 58, 56, 54, 52];
result[n] = (n as f64) * (n as f64).ln(); const NORM: [&[u64]; 7] = [&[], &NORMK1, &NORMK2, &NORMK3, &NORMK4, &NORMK5, &NORMK6];
let shift = SHIFT[K];
let ra = if LEFT { kmer >> shift } else { kmer };
let canonical_ra = NORM[K][ra as usize];
if LEFT {
canonical_ra << shift
} else {
canonical_ra
} }
result
} }
fn build_emax() -> [[f64; WS_MAX + 1]; K_MAX + 1] { #[inline(always)]
let mut result = [[0.0f64; WS_MAX + 1]; K_MAX + 1]; pub(crate) const fn ln_class_size<const LEFT: bool, const K: usize>(kmer: u64) -> f64 {
for k in 2..=K_MAX { const SHIFT: [usize; 7] = [0, 62, 60, 58, 56, 54, 52];
for ws in 1..=WS_MAX.min(k - 1) { let ra = if LEFT { kmer >> SHIFT[K] } else { kmer };
let n_raw = 1usize << (ws * 2); // 4^ws match K {
let nwords = k - ws + 1; 1 => LN_CLASS1[ra as usize],
let c = nwords / n_raw; 2 => LN_CLASS2[ra as usize],
let r = nwords % n_raw; 3 => LN_CLASS3[ra as usize],
let nf = nwords as f64; 4 => LN_CLASS4[ra as usize],
let t1 = if c == 0 || n_raw == r { 5 => LN_CLASS5[ra as usize],
0.0 6 => LN_CLASS6[ra as usize],
} else { _ => panic!("k must be 1..=6"),
let f1 = c as f64 / nf;
(n_raw - r) as f64 * f1 * f1.ln()
};
let t2 = if r == 0 {
0.0
} else {
let f2 = (c + 1) as f64 / nf;
r as f64 * f2 * f2.ln()
};
result[k][ws] = -(t1 + t2);
} }
} }
result
}
fn build_log_nwords() -> [[f64; WS_MAX + 1]; K_MAX + 1] {
let mut result = [[0.0f64; WS_MAX + 1]; K_MAX + 1];
for k in 2..=K_MAX {
for ws in 1..=WS_MAX.min(k - 1) {
result[k][ws] = ((k - ws + 1) as f64).ln();
}
}
result
}
+6
View File
@@ -26,6 +26,7 @@ use crate::scratch::SuperKmerScratch;
pub struct SuperKmerIter<'a> { pub struct SuperKmerIter<'a> {
cursor: ForwardCursor<'a>, cursor: ForwardCursor<'a>,
k: usize, k: usize,
rope_len: usize,
theta: f64, theta: f64,
scratch: SuperKmerScratch, scratch: SuperKmerScratch,
stat: RollingStat, stat: RollingStat,
@@ -44,6 +45,7 @@ impl<'a> SuperKmerIter<'a> {
Self { Self {
cursor: rope.fw_cursor(), cursor: rope.fw_cursor(),
k, k,
rope_len: rope.len(),
theta, theta,
scratch: SuperKmerScratch::new(), scratch: SuperKmerScratch::new(),
stat: RollingStat::new(level_max), stat: RollingStat::new(level_max),
@@ -71,6 +73,10 @@ impl<'a> SuperKmerIter<'a> {
impl Iterator for SuperKmerIter<'_> { impl Iterator for SuperKmerIter<'_> {
type Item = RoutableSuperKmer; type Item = RoutableSuperKmer;
fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(self.rope_len / self.k))
}
fn next(&mut self) -> Option<RoutableSuperKmer> { fn next(&mut self) -> Option<RoutableSuperKmer> {
loop { loop {
let byte = match self.cursor.read_next().ok() { let byte = match self.cursor.read_next().ok() {
+252 -130
View File
@@ -4,36 +4,118 @@ use obikseq::params;
use crate::encoding::encode_nuc; use crate::encoding::encode_nuc;
use crate::entropy_table::{WS_MAX, emax, entropy_norm_kmer, ln_class_size, log_nwords, n_log_n}; use crate::entropy_table::{WS_MAX, emax, entropy_norm_kmer, ln_class_size, log_nwords, n_log_n};
#[derive(Clone, Copy)] // ── Stack-allocated ring buffer ───────────────────────────────────────────────
/// Fixed-capacity ring buffer backed by a stack array.
/// N must be a power of two; operations are branchless via `% N`.
struct Ring<T: Copy + Default, const N: usize> {
buf: [T; N],
head: usize,
len: usize,
}
impl<T: Copy + Default, const N: usize> Ring<T, N> {
#[inline]
fn new() -> Self {
Self {
buf: [T::default(); N],
head: 0,
len: 0,
}
}
#[inline]
fn is_empty(&self) -> bool {
self.len == 0
}
#[inline]
fn clear(&mut self) {
self.len = 0;
self.head = 0;
}
#[inline]
fn push_back(&mut self, val: T) {
self.buf[(self.head + self.len) % N] = val;
self.len += 1;
}
#[inline]
fn pop_front(&mut self) -> T {
let val = self.buf[self.head];
self.head = (self.head + 1) % N;
self.len -= 1;
val
}
#[inline]
fn pop_back(&mut self) -> T {
self.len -= 1;
self.buf[(self.head + self.len) % N]
}
#[inline]
fn front(&self) -> Option<&T> {
if self.len > 0 {
Some(&self.buf[self.head])
} else {
None
}
}
#[inline]
fn back(&self) -> Option<&T> {
if self.len > 0 {
Some(&self.buf[(self.head + self.len - 1) % N])
} else {
None
}
}
/// Iterate over elements front-to-back (copies, since T: Copy).
fn iter(&self) -> impl Iterator<Item = T> + '_ {
(0..self.len).map(move |i| self.buf[(self.head + i) % N])
}
}
// ── MmerItem ──────────────────────────────────────────────────────────────────
#[derive(Clone, Copy, Default)]
struct MmerItem { struct MmerItem {
/// 0-based position of this m-mer's first base within the current segment. /// 0-based position of this m-mer's first base within the current segment.
position: usize, position: usize,
/// Raw canonical m-mer value (right-aligned), used for partition key computation. /// Raw canonical m-mer value (right-aligned).
canonical: u64, canonical: u64,
/// mix64 hash of the canonical m-mer, used as the random ordering key. /// mix64 hash of the canonical m-mer, used as the random ordering key.
hash: u64, hash: u64,
} }
// ── RollingStat ───────────────────────────────────────────────────────────────
pub struct RollingStat { pub struct RollingStat {
entropy_max_k: usize, entropy_max_k: usize,
k: usize,
m: usize,
steady: bool,
rolling_k: u64, rolling_k: u64,
rolling_rck: u64, rolling_rck: u64,
k_mask: u64, k_mask: u64,
m_mask: u64, m_mask: u64,
received: usize, received: usize,
k1q: std::collections::VecDeque<u64>,
k2q: std::collections::VecDeque<u64>, // Sliding-window queues — stack-allocated, capacity ≤ k ≤ 31.
k3q: std::collections::VecDeque<u64>, k1q: Ring<u64, 32>,
k4q: std::collections::VecDeque<u64>, k2q: Ring<u64, 32>,
k5q: std::collections::VecDeque<u64>, k3q: Ring<u64, 32>,
k6q: std::collections::VecDeque<u64>, k4q: Ring<u64, 32>,
minimier: std::collections::VecDeque<MmerItem>, k5q: Ring<u64, 32>,
k1c: Vec<usize>, k6q: Ring<u64, 32>,
k2c: Vec<usize>, minimier: Ring<MmerItem, 32>,
k3c: Vec<usize>,
k4c: Vec<usize>, // Frequency count arrays.
k5c: Vec<usize>, // Max count per cell ≤ k ≤ 31 → u8 is sufficient.
k6c: Vec<usize>, k1c: [u8; 4],
k2c: [u8; 16],
k3c: [u8; 64],
k4c: [u8; 256],
k5c: [u8; 1024],
k6c: [u8; 4096],
sum_f_log_f: [f64; WS_MAX + 1], sum_f_log_f: [f64; WS_MAX + 1],
sum_f_log_s: [f64; WS_MAX + 1], sum_f_log_s: [f64; WS_MAX + 1],
} }
@@ -44,24 +126,27 @@ impl RollingStat {
let m = params::m(); let m = params::m();
Self { Self {
entropy_max_k, entropy_max_k,
k,
m,
steady: false,
rolling_k: 0, rolling_k: 0,
rolling_rck: 0, rolling_rck: 0,
k_mask: (!0u64) >> (64 - k * 2), k_mask: (!0u64) >> (64 - k * 2),
m_mask: (!0u64) >> (64 - m * 2), m_mask: (!0u64) >> (64 - m * 2),
received: 0, received: 0,
k1q: std::collections::VecDeque::with_capacity(k), k1q: Ring::new(),
k2q: std::collections::VecDeque::with_capacity(k - 1), k2q: Ring::new(),
k3q: std::collections::VecDeque::with_capacity(k - 2), k3q: Ring::new(),
k4q: std::collections::VecDeque::with_capacity(k - 3), k4q: Ring::new(),
k5q: std::collections::VecDeque::with_capacity(k - 4), k5q: Ring::new(),
k6q: std::collections::VecDeque::with_capacity(k - 5), k6q: Ring::new(),
minimier: std::collections::VecDeque::with_capacity(k - m + 2), minimier: Ring::new(),
k1c: vec![0; 4_usize.pow(1)], k1c: [0; 4],
k2c: vec![0; 4_usize.pow(2)], k2c: [0; 16],
k3c: vec![0; 4_usize.pow(3)], k3c: [0; 64],
k4c: vec![0; 4_usize.pow(4)], k4c: [0; 256],
k5c: vec![0; 4_usize.pow(5)], k5c: [0; 1024],
k6c: vec![0; 4_usize.pow(6)], k6c: [0; 4096],
sum_f_log_f: [0.0; WS_MAX + 1], sum_f_log_f: [0.0; WS_MAX + 1],
sum_f_log_s: [0.0; WS_MAX + 1], sum_f_log_s: [0.0; WS_MAX + 1],
} }
@@ -71,24 +156,22 @@ impl RollingStat {
self.rolling_k = 0; self.rolling_k = 0;
self.rolling_rck = 0; self.rolling_rck = 0;
self.received = 0; self.received = 0;
for &i in &self.k1q { self.steady = false;
self.k1c[i as usize] = 0;
} // for i in self.k1q.iter() { self.k1c[i as usize] = 0; }
for &i in &self.k2q { // for i in self.k2q.iter() { self.k2c[i as usize] = 0; }
self.k2c[i as usize] = 0; // for i in self.k3q.iter() { self.k3c[i as usize] = 0; }
} // for i in self.k4q.iter() { self.k4c[i as usize] = 0; }
for &i in &self.k3q { // for i in self.k5q.iter() { self.k5c[i as usize] = 0; }
self.k3c[i as usize] = 0; // for i in self.k6q.iter() { self.k6c[i as usize] = 0; }
}
for &i in &self.k4q { self.k1c.fill(0);
self.k4c[i as usize] = 0; self.k2c.fill(0);
} self.k3c.fill(0);
for &i in &self.k5q { self.k4c.fill(0);
self.k5c[i as usize] = 0; self.k5c.fill(0);
} self.k6c.fill(0);
for &i in &self.k6q {
self.k6c[i as usize] = 0;
}
self.k1q.clear(); self.k1q.clear();
self.k2q.clear(); self.k2q.clear();
self.k3q.clear(); self.k3q.clear();
@@ -96,37 +179,36 @@ impl RollingStat {
self.k5q.clear(); self.k5q.clear();
self.k6q.clear(); self.k6q.clear();
self.minimier.clear(); self.minimier.clear();
self.sum_f_log_f = [0.0; WS_MAX + 1]; self.sum_f_log_f = [0.0; WS_MAX + 1];
self.sum_f_log_s = [0.0; WS_MAX + 1]; self.sum_f_log_s = [0.0; WS_MAX + 1];
} }
#[inline] #[inline]
fn update_sums_decrement( fn update_sums_decrement<const K: usize>(
sum_f_log_f: &mut [f64; WS_MAX + 1], sum_f_log_f: &mut [f64; WS_MAX + 1],
sum_f_log_s: &mut [f64; WS_MAX + 1], sum_f_log_s: &mut [f64; WS_MAX + 1],
ws: usize,
canonical: u64, canonical: u64,
f: usize, f: usize,
) { ) {
sum_f_log_f[ws] += n_log_n(f - 1) - n_log_n(f); sum_f_log_f[K] += n_log_n(f - 1) - n_log_n(f);
sum_f_log_s[ws] -= ln_class_size(canonical, ws, false); sum_f_log_s[K] -= ln_class_size::<false, K>(canonical);
} }
#[inline] #[inline]
fn update_sums_increment( fn update_sums_increment<const K: usize>(
sum_f_log_f: &mut [f64; WS_MAX + 1], sum_f_log_f: &mut [f64; WS_MAX + 1],
sum_f_log_s: &mut [f64; WS_MAX + 1], sum_f_log_s: &mut [f64; WS_MAX + 1],
ws: usize,
canonical: u64, canonical: u64,
g: usize, g: usize,
) { ) {
sum_f_log_f[ws] += n_log_n(g + 1) - n_log_n(g); sum_f_log_f[K] += n_log_n(g + 1) - n_log_n(g);
sum_f_log_s[ws] += ln_class_size(canonical, ws, false); sum_f_log_s[K] += ln_class_size::<false, K>(canonical);
} }
pub fn push(&mut self, nuc: u8) { pub fn push(&mut self, nuc: u8) {
let k = params::k(); let k = self.k;
let m = params::m(); let m = self.m;
let bnuc = encode_nuc(nuc); let bnuc = encode_nuc(nuc);
let cnuc = bnuc ^ 3; let cnuc = bnuc ^ 3;
@@ -135,12 +217,12 @@ impl RollingStat {
self.rolling_rck = self.rolling_rck =
((self.rolling_rck >> 2) | ((cnuc as u64) << ((k - 1) * 2))) & self.k_mask; ((self.rolling_rck >> 2) | ((cnuc as u64) << ((k - 1) * 2))) & self.k_mask;
let canonical_k1 = entropy_norm_kmer(self.rolling_k & 3, 1, false); let canonical_k1 = entropy_norm_kmer::<false, 1>(self.rolling_k & 3);
let canonical_k2 = entropy_norm_kmer(self.rolling_k & 15, 2, false); let canonical_k2 = entropy_norm_kmer::<false, 2>(self.rolling_k & 15);
let canonical_k3 = entropy_norm_kmer(self.rolling_k & 63, 3, false); let canonical_k3 = entropy_norm_kmer::<false, 3>(self.rolling_k & 63);
let canonical_k4 = entropy_norm_kmer(self.rolling_k & 255, 4, false); let canonical_k4 = entropy_norm_kmer::<false, 4>(self.rolling_k & 255);
let canonical_k5 = entropy_norm_kmer(self.rolling_k & 1023, 5, false); let canonical_k5 = entropy_norm_kmer::<false, 5>(self.rolling_k & 1023);
let canonical_k6 = entropy_norm_kmer(self.rolling_k & 4095, 6, false); let canonical_k6 = entropy_norm_kmer::<false, 6>(self.rolling_k & 4095);
self.received += 1; self.received += 1;
@@ -175,107 +257,147 @@ impl RollingStat {
} }
if self.received > k { if self.received > k {
let old1 = self.k1q.pop_front().unwrap(); let old1 = self.k1q.pop_front();
let f1 = self.k1c[old1 as usize]; let f1 = self.k1c[old1 as usize] as usize;
Self::update_sums_decrement(&mut self.sum_f_log_f, &mut self.sum_f_log_s, 1, old1, f1); Self::update_sums_decrement::<1>(
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
old1,
f1,
);
self.k1c[old1 as usize] -= 1; self.k1c[old1 as usize] -= 1;
let old2 = self.k2q.pop_front().unwrap(); let old2 = self.k2q.pop_front();
let f2 = self.k2c[old2 as usize]; let f2 = self.k2c[old2 as usize] as usize;
Self::update_sums_decrement(&mut self.sum_f_log_f, &mut self.sum_f_log_s, 2, old2, f2); Self::update_sums_decrement::<2>(
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
old2,
f2,
);
self.k2c[old2 as usize] -= 1; self.k2c[old2 as usize] -= 1;
let old3 = self.k3q.pop_front().unwrap(); let old3 = self.k3q.pop_front();
let f3 = self.k3c[old3 as usize]; let f3 = self.k3c[old3 as usize] as usize;
Self::update_sums_decrement(&mut self.sum_f_log_f, &mut self.sum_f_log_s, 3, old3, f3); Self::update_sums_decrement::<3>(
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
old3,
f3,
);
self.k3c[old3 as usize] -= 1; self.k3c[old3 as usize] -= 1;
let old4 = self.k4q.pop_front().unwrap(); let old4 = self.k4q.pop_front();
let f4 = self.k4c[old4 as usize]; let f4 = self.k4c[old4 as usize] as usize;
Self::update_sums_decrement(&mut self.sum_f_log_f, &mut self.sum_f_log_s, 4, old4, f4); Self::update_sums_decrement::<4>(
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
old4,
f4,
);
self.k4c[old4 as usize] -= 1; self.k4c[old4 as usize] -= 1;
let old5 = self.k5q.pop_front().unwrap(); let old5 = self.k5q.pop_front();
let f5 = self.k5c[old5 as usize]; let f5 = self.k5c[old5 as usize] as usize;
Self::update_sums_decrement(&mut self.sum_f_log_f, &mut self.sum_f_log_s, 5, old5, f5); Self::update_sums_decrement::<5>(
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
old5,
f5,
);
self.k5c[old5 as usize] -= 1; self.k5c[old5 as usize] -= 1;
let old6 = self.k6q.pop_front().unwrap(); let old6 = self.k6q.pop_front();
let f6 = self.k6c[old6 as usize]; let f6 = self.k6c[old6 as usize] as usize;
Self::update_sums_decrement(&mut self.sum_f_log_f, &mut self.sum_f_log_s, 6, old6, f6); Self::update_sums_decrement::<6>(
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
old6,
f6,
);
self.k6c[old6 as usize] -= 1; self.k6c[old6 as usize] -= 1;
} }
let g1 = self.k1c[canonical_k1 as usize]; if self.steady {
Self::update_sums_increment( let g1 = self.k1c[canonical_k1 as usize] as usize;
&mut self.sum_f_log_f, Self::update_sums_increment::<1>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k1, g1);
&mut self.sum_f_log_s, self.k1c[canonical_k1 as usize] += 1;
1, self.k1q.push_back(canonical_k1);
canonical_k1,
g1, let g2 = self.k2c[canonical_k2 as usize] as usize;
Self::update_sums_increment::<2>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k2, g2);
self.k2c[canonical_k2 as usize] += 1;
self.k2q.push_back(canonical_k2);
let g3 = self.k3c[canonical_k3 as usize] as usize;
Self::update_sums_increment::<3>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k3, g3);
self.k3c[canonical_k3 as usize] += 1;
self.k3q.push_back(canonical_k3);
let g4 = self.k4c[canonical_k4 as usize] as usize;
Self::update_sums_increment::<4>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k4, g4);
self.k4c[canonical_k4 as usize] += 1;
self.k4q.push_back(canonical_k4);
let g5 = self.k5c[canonical_k5 as usize] as usize;
Self::update_sums_increment::<5>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k5, g5);
self.k5c[canonical_k5 as usize] += 1;
self.k5q.push_back(canonical_k5);
let g6 = self.k6c[canonical_k6 as usize] as usize;
Self::update_sums_increment::<6>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k6, g6);
self.k6c[canonical_k6 as usize] += 1;
self.k6q.push_back(canonical_k6);
} else {
self.push_warmup_increments(
canonical_k1, canonical_k2, canonical_k3,
canonical_k4, canonical_k5, canonical_k6,
); );
}
}
#[cold]
#[inline(never)]
fn push_warmup_increments(
&mut self,
canonical_k1: u64, canonical_k2: u64, canonical_k3: u64,
canonical_k4: u64, canonical_k5: u64, canonical_k6: u64,
) {
let g1 = self.k1c[canonical_k1 as usize] as usize;
Self::update_sums_increment::<1>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k1, g1);
self.k1c[canonical_k1 as usize] += 1; self.k1c[canonical_k1 as usize] += 1;
self.k1q.push_back(canonical_k1); self.k1q.push_back(canonical_k1);
if self.received >= 2 { if self.received >= 2 {
let g2 = self.k2c[canonical_k2 as usize]; let g2 = self.k2c[canonical_k2 as usize] as usize;
Self::update_sums_increment( Self::update_sums_increment::<2>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k2, g2);
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
2,
canonical_k2,
g2,
);
self.k2c[canonical_k2 as usize] += 1; self.k2c[canonical_k2 as usize] += 1;
self.k2q.push_back(canonical_k2); self.k2q.push_back(canonical_k2);
if self.received >= 3 { if self.received >= 3 {
let g3 = self.k3c[canonical_k3 as usize]; let g3 = self.k3c[canonical_k3 as usize] as usize;
Self::update_sums_increment( Self::update_sums_increment::<3>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k3, g3);
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
3,
canonical_k3,
g3,
);
self.k3c[canonical_k3 as usize] += 1; self.k3c[canonical_k3 as usize] += 1;
self.k3q.push_back(canonical_k3); self.k3q.push_back(canonical_k3);
if self.received >= 4 { if self.received >= 4 {
let g4 = self.k4c[canonical_k4 as usize]; let g4 = self.k4c[canonical_k4 as usize] as usize;
Self::update_sums_increment( Self::update_sums_increment::<4>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k4, g4);
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
4,
canonical_k4,
g4,
);
self.k4c[canonical_k4 as usize] += 1; self.k4c[canonical_k4 as usize] += 1;
self.k4q.push_back(canonical_k4); self.k4q.push_back(canonical_k4);
if self.received >= 5 { if self.received >= 5 {
let g5 = self.k5c[canonical_k5 as usize]; let g5 = self.k5c[canonical_k5 as usize] as usize;
Self::update_sums_increment( Self::update_sums_increment::<5>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k5, g5);
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
5,
canonical_k5,
g5,
);
self.k5c[canonical_k5 as usize] += 1; self.k5c[canonical_k5 as usize] += 1;
self.k5q.push_back(canonical_k5); self.k5q.push_back(canonical_k5);
if self.received >= 6 { if self.received >= 6 {
let g6 = self.k6c[canonical_k6 as usize]; let g6 = self.k6c[canonical_k6 as usize] as usize;
Self::update_sums_increment( Self::update_sums_increment::<6>(&mut self.sum_f_log_f, &mut self.sum_f_log_s, canonical_k6, g6);
&mut self.sum_f_log_f,
&mut self.sum_f_log_s,
6,
canonical_k6,
g6,
);
self.k6c[canonical_k6 as usize] += 1; self.k6c[canonical_k6 as usize] += 1;
self.k6q.push_back(canonical_k6); self.k6q.push_back(canonical_k6);
self.steady = true;
} }
} }
} }
@@ -284,7 +406,7 @@ impl RollingStat {
} }
pub fn ready(&self) -> bool { pub fn ready(&self) -> bool {
self.received >= params::k() self.received >= self.k
} }
pub fn minimizer_position(&self) -> Option<usize> { pub fn minimizer_position(&self) -> Option<usize> {
@@ -305,14 +427,14 @@ impl RollingStat {
pub fn canonical_minimizer(&self) -> Option<Minimizer> { pub fn canonical_minimizer(&self) -> Option<Minimizer> {
self.canonical_minimizer_raw() self.canonical_minimizer_raw()
.map(|raw| Minimizer::from_raw_unchecked(raw << (64 - params::m() * 2))) .map(|raw| Minimizer::from_raw_unchecked(raw << (64 - self.m * 2)))
} }
pub fn entropy(&self, order: usize) -> Option<f64> { pub fn entropy(&self, order: usize) -> Option<f64> {
if !self.ready() { if !self.ready() {
return None; return None;
} }
let k = params::k(); let k = self.k;
let em = emax(k, order); let em = emax(k, order);
if em <= 0.0 { if em <= 0.0 {
return Some(1.0); return Some(1.0);
+1 -1
View File
@@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex, OnceLock};
pub const MAX_POOL_SIZE: usize = 512; pub const MAX_POOL_SIZE: usize = 512;
/// Default pending buffer threshold (bytes) before draining to the physical fd. /// Default pending buffer threshold (bytes) before draining to the physical fd.
pub const DEFAULT_FLUSH_THRESHOLD: usize = 8 * 1024; pub const DEFAULT_FLUSH_THRESHOLD: usize = 64 * 1024;
// Convenient alias for the per-entry physical writer slot. // Convenient alias for the per-entry physical writer slot.
type PhysWriter = Option<Box<dyn Write + Send>>; type PhysWriter = Option<Box<dyn Write + Send>>;
+7
View File
@@ -0,0 +1,7 @@
[package]
name = "obisys"
version = "0.1.0"
edition = "2024"
[dependencies]
libc = "0.2"
+243
View File
@@ -0,0 +1,243 @@
use std::fmt;
use std::time::Instant;
use libc::{RUSAGE_SELF, getrusage, rusage, timeval};
// ── raw helpers ───────────────────────────────────────────────────────────────
fn get_rusage() -> rusage {
let mut ru = unsafe { std::mem::zeroed::<rusage>() };
unsafe { getrusage(RUSAGE_SELF, &mut ru) };
ru
}
fn tv_to_secs(tv: timeval) -> f64 {
tv.tv_sec as f64 + tv.tv_usec as f64 * 1e-6
}
#[cfg(target_os = "macos")]
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 }
#[cfg(not(target_os = "macos"))]
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
// Monotonically increasing counters — negative delta would be a kernel bug.
fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
// ── public API ────────────────────────────────────────────────────────────────
/// Snapshot taken at the start of a pipeline stage.
#[must_use = "call .stop() to record the stage"]
pub struct Stage {
label: String,
wall: Instant,
ru: rusage,
}
impl Stage {
pub fn start(label: impl Into<String>) -> Self {
Self { label: label.into(), wall: Instant::now(), ru: get_rusage() }
}
pub fn stop(self) -> StageStats {
let wall_secs = self.wall.elapsed().as_secs_f64();
let end = get_rusage();
StageStats {
label: self.label,
wall_secs,
user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime),
sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime),
max_rss_bytes: rss_to_bytes(&end),
minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64),
major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64),
vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64),
invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64),
in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64),
out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64),
swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64),
}
}
}
/// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas.
pub struct StageStats {
pub label: String,
pub wall_secs: f64,
pub user_secs: f64,
pub sys_secs: f64,
/// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum,
/// so this reflects the high-water mark up to and including this stage.
pub max_rss_bytes: u64,
pub minor_faults: u64,
pub major_faults: u64,
pub vol_ctx: u64, // voluntary context switches
pub invol_ctx: u64, // involuntary context switches
pub in_blocks: u64, // filesystem block reads (after page cache)
pub out_blocks: u64, // filesystem block writes
pub swaps: u64,
}
impl StageStats {
/// (user + sys) / wall — effective thread count utilisation.
pub fn parallelism(&self) -> f64 {
if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs }
else { 0.0 }
}
/// parallelism / n_cores — fraction of available CPU power used (0..1+).
pub fn efficiency(&self, n_cores: usize) -> f64 {
self.parallelism() / n_cores as f64
}
}
/// Accumulates stage stats and prints a human-readable summary table.
#[derive(Default)]
pub struct Reporter {
stages: Vec<StageStats>,
}
impl Reporter {
pub fn new() -> Self { Self::default() }
pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); }
pub fn stages(&self) -> &[StageStats] { &self.stages }
/// Print the summary to stderr.
pub fn print(&self) { eprint!("{self}"); }
}
// ── diagnosis ─────────────────────────────────────────────────────────────────
struct Diagnosis {
tag: &'static str,
detail: Option<String>,
}
// Thresholds are intentionally conservative to avoid false positives.
fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
let eff = s.efficiency(n_cores);
let cpu_pct = eff * 100.0;
let io_ops = s.in_blocks + s.out_blocks;
// swaps > 0 is the only reliable cross-platform indicator of true RAM exhaustion.
// ru_majflt is intentionally excluded: on macOS it counts all file-backed mmap
// page-ins (even from page cache), making it useless as a memory-pressure signal
// for mmap-heavy code. On Linux it is more meaningful, but swaps covers the
// severe case on both platforms.
if s.swaps > 0 {
return Diagnosis {
tag: "swapping",
detail: Some(format!(
"swapped {} time(s) — working set exceeds available RAM",
s.swaps,
)),
};
}
if eff < 0.3 && io_ops > 100 {
return Diagnosis {
tag: "disk I/O",
detail: Some(format!(
"{} block reads + {} writes — CPU at {:.0}%, stage is I/O-bound",
s.in_blocks, s.out_blocks, cpu_pct,
)),
};
}
if eff < 0.3 && s.vol_ctx > 200 {
return Diagnosis {
tag: "contention",
detail: Some(format!(
"{} voluntary context switches — CPU at {:.0}%, possible lock contention or I/O wait",
s.vol_ctx, cpu_pct,
)),
};
}
Diagnosis { tag: "", detail: None }
}
// ── display helpers ───────────────────────────────────────────────────────────
fn fmt_secs(s: f64) -> String {
if s >= 100.0 { format!("{:.0}s", s) }
else if s >= 10.0 { format!("{:.1}s", s) }
else if s >= 1.0 { format!("{:.2}s", s) }
else { format!("{:.0}ms", s * 1000.0) }
}
fn fmt_bytes(b: u64) -> String {
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) }
else { format!("{:.0} KB", b as f64 / 1024.0) }
}
fn fmt_efficiency(par: f64, n_cores: usize) -> String {
format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0)
}
// ── Display ───────────────────────────────────────────────────────────────────
impl fmt::Display for Reporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.stages.is_empty() { return Ok(()); }
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
// column widths
let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5);
// efficiency col: worst-case width for this run's n_cores value
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12;
let sep = "".repeat(sep_w);
// header
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} status",
"stage", "wall", "efficiency", "peak RSS")?;
writeln!(f, "{sep}")?;
// compute all diagnoses up front (needed for both table and footnotes)
let diagnoses: Vec<Diagnosis> = self.stages.iter()
.map(|s| diagnose(s, n_cores))
.collect();
// per-stage rows
for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} {}",
s.label,
fmt_secs(s.wall_secs),
fmt_efficiency(s.parallelism(), n_cores),
fmt_bytes(s.max_rss_bytes),
d.tag,
)?;
}
// totals
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0);
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
writeln!(f, "{sep}")?;
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8}",
"TOTAL",
fmt_secs(tw),
fmt_efficiency(tpar, n_cores),
fmt_bytes(trss),
)?;
// bottleneck footnotes (only if at least one anomaly detected)
let bottlenecks: Vec<(&str, &str)> = self.stages.iter()
.zip(diagnoses.iter())
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
.collect();
if !bottlenecks.is_empty() {
writeln!(f, "\nBottlenecks:")?;
for (label, detail) in &bottlenecks {
writeln!(f, " {label} — {detail}")?;
}
}
Ok(())
}
}