Files
obikmer/src/obidebruinj/src/debruijn.rs
T
Eric Coissac c694e1f2b0 feat: add benchmark pipeline, expose APIs, and enforce strict paths
Introduces a Make-based orchestration for simulating, indexing, merging, filtering, and verifying k-mer counts and presence. Exposes internal builder and iterator APIs publicly, enforces mandatory leading slashes for predicate patterns, registers the `obitaxonomy` crate, and updates tooling configurations alongside documentation.
2026-06-22 10:18:33 +02:00

588 lines
20 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//use ahash::RandomState;
use crossbeam_channel;
use hashbrown::HashMap;
use obikseq::k;
use obikseq::{CanonicalKmer, Sequence, Unitig};
#[cfg(not(any(test, feature = "test-utils")))]
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::cell::RefCell;
use std::fmt;
use std::sync::atomic::{AtomicU8, Ordering};
use xxhash_rust::xxh3::Xxh3Builder;
// ── Types ─────────────────────────────────────────────────────────────────────
type FastHashMap<K, V> = HashMap<K, V, Xxh3Builder>;
// ── Node ──────────────────────────────────────────────────────────────────────
//
// bit layout (LSB first):
// bit 0 : can_extend_right — exactly one right canonical neighbour exists
// bit 1 : can_extend_left — exactly one left canonical neighbour exists
// bit 2 : visited
// bits 34 : right_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 0 = 1
// bits 56 : left_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 1 = 1
// bit 7 : marked as start node (1)
//
// "can_extend" = false covers both 0 neighbours and ≥2 neighbours; the only
// information needed for traversal is "exactly one".
#[repr(transparent)]
#[derive(Debug, Clone, Copy, Default)]
pub struct Node(u8);
const CAN_EXTEND_RIGHT_MASK: u8 = 0b0000_0001; // bit 0: can_extend_right — exactly one right canonical neighbour exists
const CAN_EXTEND_LEFT_MASK: u8 = 0b0000_0010; // bit 1: can_extend_left — exactly one left canonical neighbour exists
const IS_VISITED_MASK: u8 = 0b0000_0100; // bit 2: visited
const RIGHT_NUC_MASK: u8 = 0b0001_1000; // bits 34: right_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 0 = 1
const LEFT_NUC_MASK: u8 = 0b0110_0000; // bits 56: left_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 1 = 1
const IS_START_MASK: u8 = 0b1000_0000; // bit 7: marked as start node
impl Node {
/// Returns `true` if the node can be extended to the right.
///
/// A single right neighbour exists.
#[inline]
pub fn can_extend_right(self) -> bool {
self.0 & CAN_EXTEND_RIGHT_MASK != 0
}
/// Returns `true` if the node can be extended to the left.
///
/// A single left neighbour exists.
#[inline]
pub fn can_extend_left(self) -> bool {
self.0 & CAN_EXTEND_LEFT_MASK != 0
}
/// Returns `true` if the node has been visited.
#[inline]
pub fn is_visited(self) -> bool {
self.0 & IS_VISITED_MASK != 0
}
/// Returns `true` if the node is a start node.
#[inline]
pub fn is_start(self) -> bool {
self.0 & IS_START_MASK != 0
}
#[inline]
pub fn set_start(&mut self) {
self.0 |= IS_START_MASK;
}
pub fn unset_start(&mut self) {
self.0 &= !IS_START_MASK;
}
/// Index of the unique right neighbour (0=A, 1=C, 2=G, 3=T).
/// Only meaningful when `can_extend_right()` is true.
#[inline]
pub fn right_nuc(self) -> u8 {
debug_assert!(
self.can_extend_right(),
"from: right_nuc -> The node cannot be extended to the right"
);
(self.0 >> 3) & 0b11
}
/// Index of the unique left neighbour (0=A, 1=C, 2=G, 3=T).
/// Only meaningful when `can_extend_left()` is true.
#[inline]
pub fn left_nuc(self) -> u8 {
debug_assert!(
self.can_extend_left(),
"from: left_nuc -> The node cannot be extended to the left"
);
(self.0 >> 5) & 0b11
}
/// Marks the node as visited.
#[inline]
pub fn set_visited(&mut self) {
debug_assert!(
!self.is_visited(),
"from: is_visited -> The node has already been visited"
);
self.0 |= IS_VISITED_MASK;
}
/// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 34 = nucleotide index).
/// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 34 as count.sat_sub(1).
pub fn set_right(&mut self, count: u8, nuc: Option<u8>) {
self.0 &= !(CAN_EXTEND_RIGHT_MASK | RIGHT_NUC_MASK);
if count == 1 {
self.0 |= CAN_EXTEND_RIGHT_MASK;
if let Some(n) = nuc {
self.0 |= (n & 0b11) << 3;
return;
}
unreachable!("nuc must be Some when count is 1");
}
self.0 |= (count.saturating_sub(1).min(3)) << 3;
}
/// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 34 = nucleotide index).
/// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 34 as count.sat_sub(1).
pub fn set_left(&mut self, count: u8, nuc: Option<u8>) {
self.0 &= !(CAN_EXTEND_LEFT_MASK | LEFT_NUC_MASK);
if count == 1 {
self.0 |= CAN_EXTEND_LEFT_MASK;
if let Some(n) = nuc {
self.0 |= (n & 0b11) << 5;
return;
}
unreachable!("nuc must be Some when count is 1");
}
self.0 |= (count.saturating_sub(1).min(3)) << 5;
}
}
impl fmt::Display for Node {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
const NUC: [char; 4] = ['A', 'C', 'G', 'T'];
let r = if self.can_extend_right() {
format!("→{}", NUC[self.right_nuc() as usize])
} else if (self.0 >> 3) & 0b11 == 0 {
"→0".to_string()
} else {
"→≥2".to_string()
};
let l = if self.can_extend_left() {
format!("←{}", NUC[self.left_nuc() as usize])
} else if (self.0 >> 5) & 0b11 == 0 {
"←0".to_string()
} else {
"←≥2".to_string()
};
let v = if self.is_visited() { "V" } else { "." };
write!(f, "Node({r} {l} {v})")
}
}
pub struct WalkState {
kmer: CanonicalKmer,
node: Node,
direct: bool,
}
impl WalkState {
pub fn new(kmer: CanonicalKmer, node: Node, direct: bool) -> Self {
debug_assert!(!node.is_visited(), "Cannot walk over a visited node");
Self { kmer, node, direct }
}
pub fn leavable(&self, graph: &GraphDeBruijn) -> bool {
self.walk(graph).is_some()
}
pub fn reachable(&self, graph: &GraphDeBruijn) -> bool {
WalkState {
kmer: self.kmer,
node: self.node,
direct: !self.direct,
}
.leavable(graph)
}
pub fn walk(&self, graph: &GraphDeBruijn) -> Option<(WalkState, u8)> {
if self.direct {
if !self.node.can_extend_right() {
return None;
}
let nuc = self.node.right_nuc();
let next = self.kmer.into_kmer().push_right(nuc);
let cnext = next.canonical();
let dnext = next.raw() == cnext.raw();
let next_node = Node(graph.nodes.get(&cnext).unwrap().load(Ordering::Relaxed));
if next_node.is_visited() {
return None;
}
let reachable = if dnext {
next_node.can_extend_left()
} else {
next_node.can_extend_right()
};
reachable.then_some((
WalkState {
kmer: cnext,
node: next_node,
direct: dnext,
},
nuc,
))
} else {
if !self.node.can_extend_left() {
return None;
}
let nuc = self.node.left_nuc();
let next = self.kmer.into_kmer().push_left(nuc);
let cnext = next.canonical();
let dnext = next.raw() != cnext.raw();
let next_node = Node(graph.nodes.get(&cnext).unwrap().load(Ordering::Relaxed));
if next_node.is_visited() {
return None;
}
let reachable = if dnext {
next_node.can_extend_right()
} else {
next_node.can_extend_left()
};
reachable.then_some((
WalkState {
kmer: cnext,
node: next_node,
direct: dnext,
},
3 - nuc,
))
}
}
}
// ── GraphDeBruijn ─────────────────────────────────────────────────────────────
pub struct GraphDeBruijn {
nodes: FastHashMap<CanonicalKmer, AtomicU8>,
}
impl GraphDeBruijn {
pub fn new() -> Self {
Self {
nodes: FastHashMap::with_hasher(Xxh3Builder::new()),
}
}
fn for_each_node<F>(&self, f: F)
where
F: Fn(&CanonicalKmer, &AtomicU8) + Sync,
{
#[cfg(not(any(test, feature = "test-utils")))]
self.nodes.par_iter().for_each(|(k, a)| f(k, a));
#[cfg(any(test, feature = "test-utils"))]
self.nodes.iter().for_each(|(k, a)| f(k, a));
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
nodes: FastHashMap::with_capacity_and_hasher(capacity, Xxh3Builder::new()),
}
}
/// Insert a canonical kmer into the graph. No-op if already present.
pub fn push(&mut self, kmer: CanonicalKmer) {
self.nodes.entry(kmer).or_insert_with(|| AtomicU8::new(0));
}
/// For every node, find its unique right/left canonical neighbour (if any)
/// and store the nucleotide index in the Node flags.
///
/// In production builds, runs in parallel across all nodes (each entry is
/// written by exactly one thread). In test builds, runs sequentially to
/// avoid propagating thread-local k/m values to rayon worker threads.
pub fn compute_degrees_and_mark_starts(&self) {
// Pass 1: count right/left neighbors for each node
self.for_each_node(|kmer, atomic| {
let mut old = Node(atomic.load(Ordering::Relaxed));
if old.is_visited() {
old.unset_start();
atomic.store(old.0, Ordering::Relaxed);
return;
}
let (rc, rn) = count_neighbors(&kmer.right_canonical_neighbors(), &self.nodes);
let (lc, ln) = count_neighbors(&kmer.left_canonical_neighbors(), &self.nodes);
let mut node = Node(0);
node.set_right(rc, rn);
node.set_left(lc, ln);
atomic.store(node.0, Ordering::Relaxed);
});
// Pass 2: mark start nodes
self.for_each_node(|kmer, atomic| {
let mut node = Node(atomic.load(Ordering::Relaxed));
if node.is_visited() {
return;
}
if self.is_start(*kmer, node) {
node.set_start();
atomic.store(node.0, Ordering::Relaxed);
}
});
}
pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> {
self.nodes
.get(kmer)
.map(|a| Node(a.load(Ordering::Relaxed)).is_visited())
}
pub fn set_visited(&self, kmer: CanonicalKmer) {
if let Some(a) = self.nodes.get(&kmer) {
a.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
}
}
pub fn start_walk(&self, kmer: CanonicalKmer) -> WalkState {
let node = Node(
self.nodes
.get(&kmer)
.expect("Cannot start a walk from not indexed kmer")
.load(Ordering::Relaxed),
);
debug_assert!(
!node.is_visited(),
"Cannot start a walk from a visited node"
);
WalkState::new(kmer, node, true)
}
pub fn try_start_walk(&self, kmer: CanonicalKmer) -> Option<WalkState> {
let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed));
if node.is_visited() {
return None;
}
Some(WalkState::new(kmer, node, true))
}
fn unitig_nucleotides(&self, kmer: CanonicalKmer, k: usize) -> Option<UnitigNucIter<'_>> {
let old = self
.nodes
.get(&kmer)?
.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
if old & IS_VISITED_MASK != 0 {
return None;
}
let start = WalkState::new(kmer, Node(old), true);
let next_step = start.walk(self).and_then(|(next_state, nuc)| {
let ext_old = self
.nodes
.get(&next_state.kmer)?
.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
(ext_old & IS_VISITED_MASK == 0).then_some((next_state, nuc))
});
Some(UnitigNucIter {
graph: self,
start: kmer,
pos: 0,
k,
next_step,
})
}
pub fn for_each_unitig(&self, f: impl Fn(UnitigNucIter<'_>) + Sync) {
let k = k();
let n_chains = std::sync::atomic::AtomicUsize::new(0);
let n2 = std::sync::atomic::AtomicUsize::new(0);
// Boucle unique : traiter les starts, recalculer les arités, recommencer
loop {
let n_new = std::sync::atomic::AtomicUsize::new(0);
#[cfg(not(any(test, feature = "test-utils")))]
self.nodes
.par_iter()
.filter_map(|(&kmer, atomic)| {
Node(atomic.load(Ordering::Acquire))
.is_start()
.then_some(kmer)
})
.for_each(|kmer| {
if let Some(iter) = self.unitig_nucleotides(kmer, k) {
n_new.fetch_add(1, Ordering::Relaxed);
f(iter);
}
});
#[cfg(any(test, feature = "test-utils"))]
self.nodes.iter().for_each(|(&kmer, atomic)| {
if Node(atomic.load(Ordering::Acquire)).is_start() {
if let Some(iter) = self.unitig_nucleotides(kmer, k) {
n_new.fetch_add(1, Ordering::Relaxed);
f(iter);
}
}
});
let n = n_new.load(Ordering::Relaxed);
n_chains.fetch_add(n, Ordering::Relaxed);
if n == 0 {
break;
}
self.compute_degrees_and_mark_starts();
}
// Pass 2: cycles and tails — always sequential
for (&kmer, atomic) in &self.nodes {
let node = Node(atomic.load(Ordering::Acquire));
if node.is_visited() {
continue;
}
let chain_start = self.find_chain_start(kmer);
if let Some(iter) = self.unitig_nucleotides(chain_start, k) {
n2.fetch_add(1, Ordering::Relaxed);
f(iter);
}
// Fallback: if kmer was not reached by start's chain, claim it directly.
// Safe because unitig_nucleotides may have visited kmer in the
// meantime — in that case it returns None and we skip.
if chain_start != kmer {
if let Some(iter) = self.unitig_nucleotides(kmer, k) {
n2.fetch_add(1, Ordering::Relaxed);
f(iter);
}
}
}
}
/// Merge `other` into `self`.
///
/// The caller guarantees that the two graphs have **disjoint** kmer sets —
/// this is structurally true when each graph was built from a distinct
/// minimizer partition. No conflict resolution is needed: entries are moved
/// directly without checking for duplicates.
pub fn merge(&mut self, other: Self) {
self.nodes.reserve(other.nodes.len());
self.nodes.extend(other.nodes);
}
fn find_chain_start(&self, kmer: CanonicalKmer) -> CanonicalKmer {
let node = Node(self.nodes[&kmer].load(Ordering::Acquire));
let mut state = WalkState::new(kmer, node, false);
let mut seen = std::collections::HashSet::new();
seen.insert((state.kmer.raw(), state.direct));
loop {
match state.walk(self) {
None => return state.kmer,
Some((next, _)) => {
if !seen.insert((next.kmer.raw(), next.direct)) {
return kmer;
}
state = next;
}
}
}
}
fn is_start(&self, query: CanonicalKmer, node: Node) -> bool {
!WalkState {
kmer: query,
node,
direct: true,
}
.reachable(self)
}
pub fn try_for_each_unitig<E, F>(&self, f: F) -> Result<(), E>
where
E: Send,
F: FnMut(&Unitig) -> Result<(), E> + Send,
{
thread_local! {
static BUF: RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(4096));
}
let (tx, rx) = crossbeam_channel::bounded::<Unitig>(rayon::current_num_threads() * 256);
std::thread::scope(|s| {
let writer = s.spawn(move || -> Result<(), E> {
let mut f = f;
for unitig in rx {
f(&unitig)?;
}
Ok(())
});
self.for_each_unitig(|iter| {
BUF.with(|buf| {
let mut buf = buf.borrow_mut();
buf.clear();
buf.extend(iter);
tx.send(Unitig::from_nucleotides(&buf)).ok();
});
});
drop(tx);
writer.join().expect("writer thread panicked")
})
}
pub fn len(&self) -> usize {
self.nodes.len()
}
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
}
// ── UnitigNucIter ─────────────────────────────────────────────────────────────
pub struct UnitigNucIter<'a> {
graph: &'a GraphDeBruijn,
start: CanonicalKmer,
pos: usize,
k: usize,
next_step: Option<(WalkState, u8)>,
}
impl Iterator for UnitigNucIter<'_> {
type Item = u8;
fn next(&mut self) -> Option<u8> {
if self.pos < self.k {
let nuc = self.start.nucleotide(self.pos);
self.pos += 1;
Some(nuc)
} else if let Some((state, nuc)) = self.next_step.take() {
self.next_step = state.walk(self.graph).and_then(|(next_state, next_nuc)| {
let old = self
.graph
.nodes
.get(&next_state.kmer)?
.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
(old & IS_VISITED_MASK == 0).then_some((next_state, next_nuc))
});
Some(nuc)
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.k - self.pos.min(self.k), None)
}
}
/// Returns the count of neighbors and the index of the first
/// neighbor if exactly one of the four canonical neighbours exists in
/// the graph, where `i` is its index (0=A, 1=C, 2=G, 3=T).
/// Returns `None` for count = 0 or ≥2 existing neighbours.
fn count_neighbors(
neighbors: &[CanonicalKmer; 4],
nodes: &FastHashMap<CanonicalKmer, AtomicU8>,
) -> (u8, Option<u8>) {
let mut count = 0u8;
let mut nuc = 0u8;
for (i, neighbour) in neighbors.iter().enumerate() {
if let Some(a) = nodes.get(neighbour) {
if Node(a.load(Ordering::Relaxed)).is_visited() {
continue;
}
nuc = i as u8;
count += 1;
if count >= 2 {
return (2, None);
}
}
}
if count == 1 {
(1, Some(nuc))
} else {
(0, None)
}
}
// ── tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
#[path = "tests/debruijn.rs"]
mod tests;