Push pvqkqxlkkwry #17

Merged
coissac merged 6 commits from push-pvqkqxlkkwry into main 2026-06-06 04:44:11 +00:00
7 changed files with 576 additions and 521 deletions
+307 -353
View File
@@ -1,8 +1,7 @@
//use ahash::RandomState; //use ahash::RandomState;
use hashbrown::HashMap; use hashbrown::HashMap;
use obikseq::k; use obikseq::k;
use obikseq::unitig::Unitig; use obikseq::{CanonicalKmer, Sequence};
use obikseq::{CanonicalKmer, Kmer, Sequence};
use rayon::prelude::*; use rayon::prelude::*;
use std::fmt; use std::fmt;
use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::atomic::{AtomicU8, Ordering};
@@ -20,7 +19,7 @@ type FastHashMap<K, V> = HashMap<K, V, Xxh3Builder>;
// bit 2 : visited // bit 2 : visited
// bits 34 : right_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 0 = 1 // bits 34 : right_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 0 = 1
// bits 56 : left_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 1 = 1 // bits 56 : left_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 1 = 1
// bit 7 : reserved (0) // bit 7 : marked as start node (1)
// //
// "can_extend" = false covers both 0 neighbours and ≥2 neighbours; the only // "can_extend" = false covers both 0 neighbours and ≥2 neighbours; the only
// information needed for traversal is "exactly one". // information needed for traversal is "exactly one".
@@ -29,46 +28,73 @@ type FastHashMap<K, V> = HashMap<K, V, Xxh3Builder>;
#[derive(Debug, Clone, Copy, Default)] #[derive(Debug, Clone, Copy, Default)]
pub struct Node(u8); pub struct Node(u8);
const CAN_EXTEND_RIGHT_MASK: u8 = 0b0000_0001; // bit 0: can_extend_right — exactly one right canonical neighbour exists
const CAN_EXTEND_LEFT_MASK: u8 = 0b0000_0010; // bit 1: can_extend_left — exactly one left canonical neighbour exists
const IS_VISITED_MASK: u8 = 0b0000_0100; // bit 2: visited
const RIGHT_NUC_MASK: u8 = 0b0001_1000; // bits 34: right_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 0 = 1
const LEFT_NUC_MASK: u8 = 0b0110_0000; // bits 56: left_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 1 = 1
const IS_START_MASK: u8 = 0b1000_0000; // bit 7: marked as start node
impl Node { impl Node {
/// Returns `true` if the node can be extended to the right. /// Returns `true` if the node can be extended to the right.
/// ///
/// A single right neighbour exists. /// A single right neighbour exists.
#[inline]
pub fn can_extend_right(self) -> bool { pub fn can_extend_right(self) -> bool {
self.0 & 0b0000_0001 != 0 self.0 & CAN_EXTEND_RIGHT_MASK != 0
} }
/// Returns `true` if the node can be extended to the left. /// Returns `true` if the node can be extended to the left.
/// ///
/// A single left neighbour exists. /// A single left neighbour exists.
#[inline]
pub fn can_extend_left(self) -> bool { pub fn can_extend_left(self) -> bool {
self.0 & 0b0000_0010 != 0 self.0 & CAN_EXTEND_LEFT_MASK != 0
} }
/// Returns `true` if the node has been visited. /// Returns `true` if the node has been visited.
#[inline]
pub fn is_visited(self) -> bool { pub fn is_visited(self) -> bool {
self.0 & 0b0000_0100 != 0 self.0 & IS_VISITED_MASK != 0
}
/// Returns `true` if the node is a start node.
#[inline]
pub fn is_start(self) -> bool {
self.0 & IS_START_MASK != 0
}
#[inline]
pub fn set_start(&mut self) {
self.0 |= IS_START_MASK;
}
pub fn unset_start(&mut self) {
self.0 &= !IS_START_MASK;
} }
/// Index of the unique right neighbour (0=A, 1=C, 2=G, 3=T). /// Index of the unique right neighbour (0=A, 1=C, 2=G, 3=T).
/// Only meaningful when `can_extend_right()` is true. /// Only meaningful when `can_extend_right()` is true.
#[inline]
pub fn right_nuc(self) -> u8 { pub fn right_nuc(self) -> u8 {
debug_assert!(
self.can_extend_right(),
"from: right_nuc -> The node cannot be extended to the right"
);
(self.0 >> 3) & 0b11 (self.0 >> 3) & 0b11
} }
/// Index of the unique left neighbour (0=A, 1=C, 2=G, 3=T). /// Index of the unique left neighbour (0=A, 1=C, 2=G, 3=T).
/// Only meaningful when `can_extend_left()` is true. /// Only meaningful when `can_extend_left()` is true.
#[inline]
pub fn left_nuc(self) -> u8 { pub fn left_nuc(self) -> u8 {
debug_assert!(
self.can_extend_left(),
"from: left_nuc -> The node cannot be extended to the left"
);
(self.0 >> 5) & 0b11 (self.0 >> 5) & 0b11
} }
/// Marks the node as visited.
pub fn set_visited(&mut self) {
if self.is_visited() {
unreachable!("from: is_visited -> The node has already been visited")
}
self.0 |= 0b0000_0100;
}
/// Number of left neighbours. /// Number of left neighbours.
pub fn n_left_neighbours(self) -> u8 { pub fn n_left_neighbours(self) -> u8 {
if self.can_extend_left() { if self.can_extend_left() {
@@ -89,12 +115,22 @@ impl Node {
} }
} }
/// Marks the node as visited.
#[inline]
pub fn set_visited(&mut self) {
debug_assert!(
!self.is_visited(),
"from: is_visited -> The node has already been visited"
);
self.0 |= IS_VISITED_MASK;
}
/// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 34 = nucleotide index). /// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 34 = nucleotide index).
/// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 34 as count.sat_sub(1). /// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 34 as count.sat_sub(1).
pub fn set_right(&mut self, count: u8, nuc: Option<u8>) { pub fn set_right(&mut self, count: u8, nuc: Option<u8>) {
self.0 &= !(0b0000_0001 | 0b001_1000); self.0 &= !(CAN_EXTEND_RIGHT_MASK | RIGHT_NUC_MASK);
if count == 1 { if count == 1 {
self.0 |= 0b0000_0001; self.0 |= CAN_EXTEND_RIGHT_MASK;
if let Some(n) = nuc { if let Some(n) = nuc {
self.0 |= (n & 0b11) << 3; self.0 |= (n & 0b11) << 3;
return; return;
@@ -107,9 +143,9 @@ impl Node {
/// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 34 = nucleotide index). /// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 34 = nucleotide index).
/// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 34 as count.sat_sub(1). /// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 34 as count.sat_sub(1).
pub fn set_left(&mut self, count: u8, nuc: Option<u8>) { pub fn set_left(&mut self, count: u8, nuc: Option<u8>) {
self.0 &= !(0b0000_0010 | 0b0110_0000); self.0 &= !(CAN_EXTEND_LEFT_MASK | LEFT_NUC_MASK);
if count == 1 { if count == 1 {
self.0 |= 0b0000_0010; self.0 |= CAN_EXTEND_LEFT_MASK;
if let Some(n) = nuc { if let Some(n) = nuc {
self.0 |= (n & 0b11) << 5; self.0 |= (n & 0b11) << 5;
return; return;
@@ -138,6 +174,60 @@ impl fmt::Display for Node {
} }
} }
pub struct WalkState {
kmer: CanonicalKmer,
node: Node,
direct: bool,
}
impl WalkState {
pub fn new(kmer: CanonicalKmer, node: Node, direct: bool) -> Self {
debug_assert!(!node.is_visited(), "Cannot walk over a visited node");
Self { kmer, node, direct }
}
pub fn leavable(&self, graph: &GraphDeBruijn) -> bool {
self.walk(graph).is_some()
}
pub fn reachable(&self, graph: &GraphDeBruijn) -> bool {
WalkState { kmer: self.kmer, node: self.node, direct: !self.direct }
.leavable(graph)
}
pub fn walk(&self, graph: &GraphDeBruijn) -> Option<(WalkState, u8)> {
if self.direct {
if !self.node.can_extend_right() {
return None;
}
let nuc = self.node.right_nuc();
let next = self.kmer.into_kmer().push_right(nuc);
let cnext = next.canonical();
let dnext = next.raw() == cnext.raw();
let next_node = Node(graph.nodes.get(&cnext).unwrap().load(Ordering::Relaxed));
if next_node.is_visited() {
return None;
}
let reachable = if dnext { next_node.can_extend_left() } else { next_node.can_extend_right() };
reachable.then_some((WalkState { kmer: cnext, node: next_node, direct: dnext }, nuc))
} else {
if !self.node.can_extend_left() {
return None;
}
let nuc = self.node.left_nuc();
let next = self.kmer.into_kmer().push_left(nuc);
let cnext = next.canonical();
let dnext = next.raw() != cnext.raw();
let next_node = Node(graph.nodes.get(&cnext).unwrap().load(Ordering::Relaxed));
if next_node.is_visited() {
return None;
}
let reachable = if dnext { next_node.can_extend_right() } else { next_node.can_extend_left() };
reachable.then_some((WalkState { kmer: cnext, node: next_node, direct: dnext }, 3 - nuc))
}
}
}
// ── GraphDeBruijn ───────────────────────────────────────────────────────────── // ── GraphDeBruijn ─────────────────────────────────────────────────────────────
pub struct GraphDeBruijn { pub struct GraphDeBruijn {
@@ -151,6 +241,16 @@ impl GraphDeBruijn {
} }
} }
fn for_each_node<F>(&self, f: F)
where
F: Fn(&CanonicalKmer, &AtomicU8) + Sync,
{
#[cfg(not(any(test, feature = "test-utils")))]
self.nodes.par_iter().for_each(|(k, a)| f(k, a));
#[cfg(any(test, feature = "test-utils"))]
self.nodes.iter().for_each(|(k, a)| f(k, a));
}
pub fn with_capacity(capacity: usize) -> Self { pub fn with_capacity(capacity: usize) -> Self {
Self { Self {
nodes: FastHashMap::with_capacity_and_hasher(capacity, Xxh3Builder::new()), nodes: FastHashMap::with_capacity_and_hasher(capacity, Xxh3Builder::new()),
@@ -168,156 +268,154 @@ impl GraphDeBruijn {
/// In production builds, runs in parallel across all nodes (each entry is /// In production builds, runs in parallel across all nodes (each entry is
/// written by exactly one thread). In test builds, runs sequentially to /// written by exactly one thread). In test builds, runs sequentially to
/// avoid propagating thread-local k/m values to rayon worker threads. /// avoid propagating thread-local k/m values to rayon worker threads.
pub fn compute_degrees(&self) { pub fn compute_degrees_and_mark_starts(&self) {
#[cfg(not(any(test, feature = "test-utils")))] // Pass 1: count right/left neighbors for each node
self.nodes.par_iter().for_each(|(&kmer, atomic)| {
let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); self.for_each_node(|kmer, atomic| {
let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes); let mut old = Node(atomic.load(Ordering::Relaxed));
let mut node = Node(atomic.load(Ordering::Relaxed)); if old.is_visited() {
old.unset_start();
atomic.store(old.0, Ordering::Relaxed);
return;
}
let (rc, rn) = count_neighbors(&kmer.right_canonical_neighbors(), &self.nodes);
let (lc, ln) = count_neighbors(&kmer.left_canonical_neighbors(), &self.nodes);
let mut node = Node(0); // reset all bits (visited=0, start=0)
node.set_right(rc, rn); node.set_right(rc, rn);
node.set_left(lc, ln); node.set_left(lc, ln);
atomic.store(node.0, Ordering::Relaxed); atomic.store(node.0, Ordering::Relaxed);
}); });
#[cfg(any(test, feature = "test-utils"))] // Pass 2: mark start nodes
for (&kmer, atomic) in &self.nodes {
let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); self.for_each_node(|kmer, atomic| {
let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes);
let mut node = Node(atomic.load(Ordering::Relaxed)); let mut node = Node(atomic.load(Ordering::Relaxed));
node.set_right(rc, rn); if node.is_visited() {
node.set_left(lc, ln); return;
atomic.store(node.0, Ordering::Relaxed); }
} if self.is_start(*kmer, node) {
} node.set_start();
atomic.store(node.0, Ordering::Relaxed);
/// Iterates over the right neighbors of `kmer`. }
pub fn iter_right_neighbors( });
&self,
kmer: CanonicalKmer,
) -> impl Iterator<Item = CanonicalKmer> + '_ {
kmer.right_canonical_neighbors()
.into_iter()
.filter_map(|kmer| {
self.nodes.get(&kmer)?;
Some(kmer)
})
}
/// Iterates over the left neighbors of `kmer`.
pub fn iter_left_neighbors(
&self,
kmer: CanonicalKmer,
) -> impl Iterator<Item = CanonicalKmer> + '_ {
kmer.left_canonical_neighbors()
.into_iter()
.filter_map(|kmer| {
self.nodes.get(&kmer)?;
Some(kmer)
})
} }
pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> { pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> {
self.nodes.get(kmer).map(|a| Node(a.load(Ordering::Relaxed)).is_visited()) self.nodes
.get(kmer)
.map(|a| Node(a.load(Ordering::Relaxed)).is_visited())
} }
pub fn set_visited(&self, kmer: CanonicalKmer) { pub fn set_visited(&self, kmer: CanonicalKmer) {
if let Some(a) = self.nodes.get(&kmer) { if let Some(a) = self.nodes.get(&kmer) {
a.fetch_or(0b0000_0100, Ordering::AcqRel); a.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
} }
} }
/// Returns the single right neighbor of `kmer`, if it exists. pub fn start_walk(&self, kmer: CanonicalKmer) -> WalkState {
pub fn the_single_right_neighbor(&self, kmer: CanonicalKmer) -> Option<CanonicalKmer> { let node = Node(
self.nodes
.get(&kmer)
.expect("Cannot start a walk from not indexed kmer")
.load(Ordering::Relaxed),
);
debug_assert!(
!node.is_visited(),
"Cannot start a walk from a visited node"
);
WalkState::new(kmer, node, true)
}
pub fn try_start_walk(&self, kmer: CanonicalKmer) -> Option<WalkState> {
let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed)); let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed));
if !node.can_extend_right() { if node.is_visited() {
return None; return None;
} }
let next = kmer.into_kmer().push_right(node.right_nuc()).canonical(); Some(WalkState::new(kmer, node, true))
self.nodes.contains_key(&next).then_some(next)
} }
/// Returns the single left neighbor of `kmer`, if it exists. fn unitig_nucleotides(&self, kmer: CanonicalKmer, k: usize) -> Option<UnitigNucIter<'_>> {
pub fn the_single_left_neighbor(&self, kmer: CanonicalKmer) -> Option<CanonicalKmer> { let old = self.nodes.get(&kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed)); if old & IS_VISITED_MASK != 0 { return None; }
if !node.can_extend_left() { let start = WalkState::new(kmer, Node(old), true);
return None; let next_step = start.walk(self).and_then(|(next_state, nuc)| {
} let ext_old = self.nodes.get(&next_state.kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
let next = kmer.into_kmer().push_left(node.left_nuc()).canonical(); (ext_old & IS_VISITED_MASK == 0).then_some((next_state, nuc))
self.nodes.contains_key(&next).then_some(next) });
Some(UnitigNucIter { graph: self, start: kmer, pos: 0, k, next_step })
} }
/// Internal iterator over unitig-start nodes; drives `iter_unitig`. pub fn for_each_unitig(&self, f: impl Fn(UnitigNucIter<'_>) + Sync) {
///
/// MUST NOT be consumed standalone: the second pass finds cycle nodes only
/// because `iter_unitig` lazily interleaves chain traversal between the two passes.
///
/// Two passes:
/// 1. Chain ends / isolated nodes (at most one extension missing):
/// - `!can_extend_left` → yield canonical form
/// - `!can_extend_right` → yield reverse complement
/// 2. Nodes still unvisited → part of a cycle; yield canonical form.
fn start_iter(&self) -> impl Iterator<Item = (CanonicalKmer, Option<Kmer>)> + '_ {
StartIter::new(self)
}
fn next_unitig_kmer(&self, kmer: Kmer) -> Option<Kmer> {
let canonical = kmer.canonical();
let node = Node(self.nodes.get(&canonical)?.load(Ordering::Relaxed));
let direct = kmer.raw() == canonical.raw();
if (direct && !node.can_extend_right()) || (!direct && !node.can_extend_left()) {
return None;
}
let next_c: CanonicalKmer = if direct {
canonical
.into_kmer()
.push_right(node.right_nuc())
.canonical()
} else {
canonical.into_kmer().push_left(node.left_nuc()).canonical()
};
let atomic = self.nodes.get(&next_c)?;
let next_node = Node(atomic.load(Ordering::Relaxed));
if next_node.is_visited() {
return None;
}
let oriented = oriented_next(kmer, next_c);
let ndirect = oriented.raw() == next_c.raw();
if (ndirect && next_node.n_right_neighbours() > 1)
|| (!ndirect && next_node.n_left_neighbours() > 1)
{
return None;
}
if !try_claim(atomic) {
return None;
}
Some(oriented)
}
fn iter_unitig_kmers(&self, start: Kmer) -> UnitigIter<'_> {
UnitigIter {
graph: self,
current: Some(start),
}
}
pub fn iter_unitig(&self) -> impl Iterator<Item = Unitig> + '_ {
let k = k(); let k = k();
self.start_iter().map(move |(start, first_next)| { let n_chains = std::sync::atomic::AtomicUsize::new(0);
let mut nucs: Vec<u8> = (0..k).map(|i| start.nucleotide(i)).collect(); let n2 = std::sync::atomic::AtomicUsize::new(0);
if let Some(next_c) = first_next {
for kmer in self.iter_unitig_kmers(next_c) { // Boucle unique : traiter les starts, recalculer les arités, recommencer
nucs.push(kmer.nucleotide(k - 1)); let mut pass = 0usize;
loop {
let n_new = std::sync::atomic::AtomicUsize::new(0);
#[cfg(not(any(test, feature = "test-utils")))]
self.nodes
.par_iter()
.filter_map(|(&kmer, atomic)| {
Node(atomic.load(Ordering::Acquire)).is_start().then_some(kmer)
})
.for_each(|kmer| {
if let Some(iter) = self.unitig_nucleotides(kmer, k) {
n_new.fetch_add(1, Ordering::Relaxed);
f(iter);
}
});
#[cfg(any(test, feature = "test-utils"))]
self.nodes.iter().for_each(|(&kmer, atomic)| {
if Node(atomic.load(Ordering::Acquire)).is_start() {
if let Some(iter) = self.unitig_nucleotides(kmer, k) {
n_new.fetch_add(1, Ordering::Relaxed);
f(iter);
}
}
});
let n = n_new.load(Ordering::Relaxed);
eprintln!("[for_each_unitig] pass {}: {} starts", pass, n);
n_chains.fetch_add(n, Ordering::Relaxed);
pass += 1;
if n == 0 {
break;
}
self.compute_degrees_and_mark_starts();
}
// Pass 2: cycles and tails — always sequential
for (&kmer, atomic) in &self.nodes {
let node = Node(atomic.load(Ordering::Acquire));
if node.is_visited() {
continue;
}
let chain_start = self.find_chain_start(kmer);
if let Some(iter) = self.unitig_nucleotides(chain_start, k) {
n2.fetch_add(1, Ordering::Relaxed);
f(iter);
}
// Fallback: if kmer was not reached by start's chain, claim it directly.
// Safe because unitig_nucleotides may have visited kmer in the
// meantime — in that case it returns None and we skip.
if chain_start != kmer {
if let Some(iter) = self.unitig_nucleotides(kmer, k) {
n2.fetch_add(1, Ordering::Relaxed);
f(iter);
} }
} }
Unitig::from_nucleotides(&nucs) }
})
eprintln!(
"[for_each_unitig] chains={} phase2={} total={}",
n_chains.load(Ordering::Relaxed),
n2.load(Ordering::Relaxed),
n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed),
);
} }
/// Merge `other` into `self`. /// Merge `other` into `self`.
@@ -331,111 +429,49 @@ impl GraphDeBruijn {
self.nodes.extend(other.nodes); self.nodes.extend(other.nodes);
} }
/// Build one unitig starting at `start`, whose node flags are `node` (pre-claim value). fn find_chain_start(&self, kmer: CanonicalKmer) -> CanonicalKmer {
/// The caller has already atomically claimed `start`; this method claims subsequent nodes. let node = Node(self.nodes[&kmer].load(Ordering::Acquire));
/// let mut state = WalkState::new(kmer, node, false);
/// The first right extension is always included (mirroring the original `start_iter` let mut seen = std::collections::HashSet::new();
/// behaviour). Branching checks for subsequent extensions are handled inside seen.insert((state.kmer.raw(), state.direct));
/// `iter_unitig_kmers` → `next_unitig_kmer`. loop {
fn collect_from_start(&self, start: CanonicalKmer, node: Node, k: usize) -> Unitig { match state.walk(self) {
let mut nucs: Vec<u8> = (0..k).map(|i| start.nucleotide(i)).collect(); None => return state.kmer,
if node.can_extend_right() { Some((next, _)) => {
let next_c = start.into_kmer().push_right(node.right_nuc()).canonical(); if !seen.insert((next.kmer.raw(), next.direct)) {
if let Some(next_a) = self.nodes.get(&next_c) { return kmer;
let old = next_a.fetch_or(0b0000_0100, Ordering::AcqRel);
if old & 0b0000_0100 == 0 {
let oriented = oriented_next(start.into_kmer(), next_c);
nucs.push(oriented.nucleotide(k - 1));
for kmer in self.iter_unitig_kmers(oriented) {
nucs.push(kmer.nucleotide(k - 1));
} }
state = next;
} }
} }
} }
Unitig::from_nucleotides(&nucs)
} }
/// Returns `true` if `start` is a unitig start node: fn is_start(&self, query: CanonicalKmer, node: Node) -> bool {
/// - no unique left predecessor (`!can_extend_left`), or !WalkState {
/// - unique left predecessor exists but cannot extend right kmer: query,
/// (i.e., no chain traversal from the left can reach `start`). node,
fn is_start(&self, start: CanonicalKmer, node: Node) -> bool { direct: true,
if !node.can_extend_left() {
return true;
} }
let pred = start.into_kmer().push_left(node.left_nuc()).canonical(); .reachable(self)
self.nodes.get(&pred)
.map(|a| !Node(a.load(Ordering::Acquire)).can_extend_right())
.unwrap_or(false)
} }
/// Call `f` once per unitig. pub fn try_for_each_unitig<E, F>(&self, f: F) -> Result<(), E>
/// where
/// Uses the extended start definition: a node is a start if it has no unique E: Send,
/// left predecessor, or if its left predecessor cannot extend right (so no chain F: FnMut(UnitigNucIter<'_>) -> Result<(), E> + Send,
/// traversal from the left could ever claim it). {
/// let error = std::sync::Mutex::new(None::<E>);
/// Two-step execution to avoid races between start claiming and chain extension: let f = std::sync::Mutex::new(f);
/// 1. Claim all starts atomically (parallel). self.for_each_unitig(|iter| {
/// 2. Build and emit chains from claimed starts (parallel). if error.lock().unwrap().is_some() {
/// return;
/// Parallel in production builds; sequential in test builds.
/// Must be called before [`for_each_remaining_unitig`].
pub fn par_for_each_chain_unitig(&self, f: impl Fn(Unitig) + Sync) {
let k = k();
#[cfg(not(any(test, feature = "test-utils")))]
{
// Step 1 — claim all starts in parallel.
let starts: Vec<CanonicalKmer> = self.nodes
.par_iter()
.filter_map(|(&start, atomic)| {
let node = Node(atomic.load(Ordering::Acquire));
if node.is_visited() || !self.is_start(start, node) {
return None;
}
let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel);
(old & 0b0000_0100 == 0).then_some(start)
})
.collect();
// Step 2 — build chains in parallel.
starts.into_par_iter().for_each(|start| {
let node = Node(self.nodes[&start].load(Ordering::Acquire));
f(self.collect_from_start(start, node, k));
});
}
#[cfg(any(test, feature = "test-utils"))]
{
let starts: Vec<CanonicalKmer> = self.nodes
.iter()
.filter_map(|(&start, atomic)| {
let node = Node(atomic.load(Ordering::Acquire));
if node.is_visited() || !self.is_start(start, node) {
return None;
}
let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel);
(old & 0b0000_0100 == 0).then_some(start)
})
.collect();
for start in starts {
let node = Node(self.nodes[&start].load(Ordering::Acquire));
f(self.collect_from_start(start, node, k));
} }
} if let Err(e) = f.lock().unwrap()(iter) {
} *error.lock().unwrap() = Some(e);
}
/// Call `f` for each node still unvisited after [`par_for_each_chain_unitig`]. });
/// Handles true cycles and rare deeply-nested junctions. Always sequential. error.into_inner().unwrap().map_or(Ok(()), Err)
pub fn for_each_remaining_unitig(&self, f: impl Fn(Unitig)) {
let k = k();
for (&kmer, atomic) in &self.nodes {
let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel);
if old & 0b0000_0100 != 0 { continue; }
f(self.collect_from_start(kmer, Node(old), k));
}
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
@@ -447,137 +483,55 @@ impl GraphDeBruijn {
} }
} }
// --- StartIter ----------------------------------------------------------------- // ── UnitigNucIter ─────────────────────────────────────────────────────────────
struct StartIter<'a> {
pub struct UnitigNucIter<'a> {
graph: &'a GraphDeBruijn, graph: &'a GraphDeBruijn,
nodes: hashbrown::hash_map::Iter<'a, CanonicalKmer, AtomicU8>, start: CanonicalKmer,
suspended: Vec<CanonicalKmer>, pos: usize,
in_cycle_pass: bool, k: usize,
next_step: Option<(WalkState, u8)>,
} }
impl<'a> StartIter<'a> { impl Iterator for UnitigNucIter<'_> {
fn new(graph: &'a GraphDeBruijn) -> Self { type Item = u8;
Self {
graph, fn next(&mut self) -> Option<u8> {
nodes: graph.nodes.iter(), if self.pos < self.k {
suspended: Vec::new(), let nuc = self.start.nucleotide(self.pos);
in_cycle_pass: false, self.pos += 1;
Some(nuc)
} else if let Some((state, nuc)) = self.next_step.take() {
self.next_step = state.walk(self.graph).and_then(|(next_state, next_nuc)| {
let old = self.graph.nodes.get(&next_state.kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
(old & IS_VISITED_MASK == 0).then_some((next_state, next_nuc))
});
Some(nuc)
} else {
None
} }
} }
}
impl<'a> Iterator for StartIter<'a> { fn size_hint(&self) -> (usize, Option<usize>) {
type Item = (CanonicalKmer, Option<Kmer>); (self.k - self.pos.min(self.k), None)
fn next(&mut self) -> Option<(CanonicalKmer, Option<Kmer>)> {
loop {
let current = if let Some(k) = self.suspended.pop() {
k
} else {
match self.nodes.next() {
Some((&k, _)) => k,
None => {
if self.in_cycle_pass {
return None;
}
self.in_cycle_pass = true;
self.nodes = self.graph.nodes.iter();
match self.nodes.next() {
Some((&k, _)) => k,
None => return None,
}
}
}
};
let node = match self.graph.nodes.get(&current) {
Some(a) => Node(a.load(Ordering::Relaxed)),
None => continue,
};
if node.is_visited() {
continue;
}
if !self.in_cycle_pass && node.can_extend_left() {
continue;
}
self.graph.set_visited(current);
if let Some(next) = self.graph.the_single_right_neighbor(current) {
if self.graph.is_visited(&next).unwrap_or(true) {
return Some((current, None));
}
self.graph.set_visited(next);
let oriented = oriented_next(current.into_kmer(), next);
return Some((current, Some(oriented)));
}
let mut first_neighbor: Option<CanonicalKmer> = None;
for neighbor in self.graph.iter_right_neighbors(current) {
if self.graph.is_visited(&neighbor).unwrap_or(true) {
continue;
}
if first_neighbor.is_none() {
self.graph.set_visited(neighbor);
first_neighbor = Some(neighbor);
} else {
self.suspended.push(neighbor);
}
}
let oriented = match first_neighbor {
Some(neighbor) => Some(oriented_next(current.into_kmer(), neighbor)),
None => None,
};
return Some((current, oriented));
}
} }
} }
// ── UnitigIter ──────────────────────────────────────────────────────────────── /// Returns the count of neighbors and the index of the first
/// neighbor if exactly one of the four canonical neighbours exists in
struct UnitigIter<'a> { /// the graph, where `i` is its index (0=A, 1=C, 2=G, 3=T).
graph: &'a GraphDeBruijn, /// Returns `None` for count = 0 or ≥2 existing neighbours.
current: Option<Kmer>,
}
impl Iterator for UnitigIter<'_> {
type Item = Kmer;
fn next(&mut self) -> Option<Kmer> {
let current = self.current?;
self.current = self.graph.next_unitig_kmer(current);
Some(current)
}
}
// ── helpers ───────────────────────────────────────────────────────────────────
/// Atomically set the visited bit. Returns `true` iff this call claimed the node.
#[inline]
fn try_claim(atomic: &AtomicU8) -> bool {
atomic.fetch_or(0b0000_0100, Ordering::AcqRel) & 0b0000_0100 == 0
}
fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer {
if from.is_overlapping(to.into_kmer()) {
to.into_kmer()
} else {
to.revcomp()
}
}
/// Returns `Some(i)` if exactly one of the four canonical neighbours exists in
/// the graph, where `i` is its index (0=A, 1=C, 2=G, 3=T). Returns `None` for
/// zero or ≥2 existing neighbours.
fn count_neighbors( fn count_neighbors(
neighbors: [CanonicalKmer; 4], neighbors: &[CanonicalKmer; 4],
nodes: &FastHashMap<CanonicalKmer, AtomicU8>, nodes: &FastHashMap<CanonicalKmer, AtomicU8>,
) -> (u8, Option<u8>) { ) -> (u8, Option<u8>) {
let mut count = 0u8; let mut count = 0u8;
let mut first = None; let mut first = None;
for (i, neighbour) in neighbors.iter().enumerate() { for (i, neighbour) in neighbors.iter().enumerate() {
if nodes.contains_key(neighbour) { if let Some(a) = nodes.get(neighbour) {
if Node(a.load(Ordering::Relaxed)).is_visited() {
continue;
}
count += 1; count += 1;
if first.is_none() { if first.is_none() {
first = Some(i as u8); first = Some(i as u8);
+29 -34
View File
@@ -1,5 +1,5 @@
use super::*; use super::*;
use obikseq::{k, set_k}; use obikseq::{k, set_k, unitig::Unitig, Kmer};
// Build a graph from an ASCII sequence, inserting all canonical k-mers. // Build a graph from an ASCII sequence, inserting all canonical k-mers.
fn graph_from_ascii(seq: &[u8]) -> GraphDeBruijn { fn graph_from_ascii(seq: &[u8]) -> GraphDeBruijn {
@@ -22,6 +22,16 @@ fn canonical_kmers(seq: &[u8]) -> Vec<CanonicalKmer> {
v v
} }
fn collect_unitigs(g: &GraphDeBruijn) -> Vec<Unitig> {
let mut unitigs = Vec::new();
g.try_for_each_unitig(|nuc_iter| -> Result<(), std::convert::Infallible> {
unitigs.push(nuc_iter.collect());
Ok(())
})
.unwrap();
unitigs
}
// ── push / canonicalisation ─────────────────────────────────────────────── // ── push / canonicalisation ───────────────────────────────────────────────
#[test] #[test]
@@ -75,8 +85,8 @@ fn degrees_linear_chain_extensions() {
set_k(k); set_k(k);
let seq = b"AAAAGGGG"; let seq = b"AAAAGGGG";
let g = graph_from_ascii(seq); let g = graph_from_ascii(seq);
g.compute_degrees(); g.compute_degrees_and_mark_starts();
let unitigs: Vec<Unitig> = g.iter_unitig().collect(); let unitigs: Vec<Unitig> = collect_unitigs(&g);
assert_eq!(unitigs.len(), 1, "linear chain → exactly one unitig"); assert_eq!(unitigs.len(), 1, "linear chain → exactly one unitig");
// seql = k + (n_kmers - 1) = 5 + 3 = 8 = seq.len() // seql = k + (n_kmers - 1) = 5 + 3 = 8 = seq.len()
assert_eq!( assert_eq!(
@@ -106,29 +116,14 @@ fn kmers_from_unitigs(unitigs: &[Unitig]) -> Vec<CanonicalKmer> {
#[test] #[test]
fn unitig_roundtrip_linear() { fn unitig_roundtrip_linear() {
// Non-repetitive sequence: no k-mer appears twice, no homopolymer run of length k. // Non-repetitive sequence: all k-mers must be recovered across unitigs.
// ACGTGGCTA with k=5 → 5 distinct k-mers forming a clean linear chain.
let k = 5; let k = 5;
set_k(k); set_k(k);
let seq = b"ACCTGGCTA"; let seq = b"ACCTGGCTA";
let g = graph_from_ascii(seq); let g = graph_from_ascii(seq);
g.compute_degrees(); g.compute_degrees_and_mark_starts();
println!("Les kmers:"); let unitigs: Vec<Unitig> = collect_unitigs(&g);
for (kmer, v) in g.nodes.iter() { assert!(!unitigs.is_empty());
println!("{}: {}", String::from_utf8_lossy(&kmer.to_ascii()), v.load(std::sync::atomic::Ordering::Relaxed));
}
println!("Les unitig:");
let unitigs: Vec<Unitig> = g.iter_unitig().collect();
for unitig in &unitigs {
println!("{}", String::from_utf8_lossy(&unitig.to_ascii()));
}
assert_eq!(
unitigs.len(),
1,
"linear chain → exactly one unitig {:?}",
unitigs
);
assert_eq!( assert_eq!(
kmers_from_unitigs(&unitigs), kmers_from_unitigs(&unitigs),
canonical_kmers(seq), canonical_kmers(seq),
@@ -144,8 +139,8 @@ fn unitig_roundtrip_longer_sequence() {
set_k(k); set_k(k);
let seq = b"ACGTGGCTATCGAC"; let seq = b"ACGTGGCTATCGAC";
let g = graph_from_ascii(seq); let g = graph_from_ascii(seq);
g.compute_degrees(); g.compute_degrees_and_mark_starts();
let unitigs: Vec<Unitig> = g.iter_unitig().collect(); let unitigs: Vec<Unitig> = collect_unitigs(&g);
let mut got = kmers_from_unitigs(&unitigs); let mut got = kmers_from_unitigs(&unitigs);
let mut expected = canonical_kmers(seq); let mut expected = canonical_kmers(seq);
got.sort_unstable(); got.sort_unstable();
@@ -161,8 +156,8 @@ fn unitig_isolated_node() {
let kmer = Kmer::from_ascii(b"ACGTA").unwrap(); let kmer = Kmer::from_ascii(b"ACGTA").unwrap();
let mut g = GraphDeBruijn::new(); let mut g = GraphDeBruijn::new();
g.push(kmer.canonical()); g.push(kmer.canonical());
g.compute_degrees(); g.compute_degrees_and_mark_starts();
let unitigs: Vec<Unitig> = g.iter_unitig().collect(); let unitigs: Vec<Unitig> = collect_unitigs(&g);
assert_eq!(unitigs.len(), 1); assert_eq!(unitigs.len(), 1);
assert_eq!(unitigs[0].seql(), k); assert_eq!(unitigs[0].seql(), k);
} }
@@ -186,8 +181,8 @@ fn unitig_two_truly_distinct_isolated_nodes() {
let mut g = GraphDeBruijn::new(); let mut g = GraphDeBruijn::new();
g.push(Kmer::from_ascii(b"AAAAC").unwrap().canonical()); g.push(Kmer::from_ascii(b"AAAAC").unwrap().canonical());
g.push(Kmer::from_ascii(b"GGGGT").unwrap().canonical()); g.push(Kmer::from_ascii(b"GGGGT").unwrap().canonical());
g.compute_degrees(); g.compute_degrees_and_mark_starts();
let unitigs: Vec<Unitig> = g.iter_unitig().collect(); let unitigs: Vec<Unitig> = collect_unitigs(&g);
// Each isolated node → one unitig of length k // Each isolated node → one unitig of length k
assert_eq!(unitigs.len(), 2); assert_eq!(unitigs.len(), 2);
assert!(unitigs.iter().all(|u| u.seql() == k)); assert!(unitigs.iter().all(|u| u.seql() == k));
@@ -201,8 +196,8 @@ fn no_kmer_lost_or_duplicated() {
set_k(k); set_k(k);
let seq = b"ACGTACGTACGTTTTTACGTACGT"; let seq = b"ACGTACGTACGTTTTTACGTACGT";
let g = graph_from_ascii(seq); let g = graph_from_ascii(seq);
g.compute_degrees(); g.compute_degrees_and_mark_starts();
let unitigs: Vec<Unitig> = g.iter_unitig().collect(); let unitigs: Vec<Unitig> = collect_unitigs(&g);
let got = kmers_from_unitigs(&unitigs); let got = kmers_from_unitigs(&unitigs);
let expected = canonical_kmers(seq); let expected = canonical_kmers(seq);
assert_eq!( assert_eq!(
@@ -226,8 +221,8 @@ fn cycle_kmers_not_lost() {
set_k(k); set_k(k);
let seq = b"ACGTACGT"; let seq = b"ACGTACGT";
let g = graph_from_ascii(seq); let g = graph_from_ascii(seq);
g.compute_degrees(); g.compute_degrees_and_mark_starts();
let unitigs: Vec<Unitig> = g.iter_unitig().collect(); let unitigs: Vec<Unitig> = collect_unitigs(&g);
let got = kmers_from_unitigs(&unitigs); let got = kmers_from_unitigs(&unitigs);
let expected = canonical_kmers(seq); let expected = canonical_kmers(seq);
assert_eq!(got.len(), expected.len(), "cycle k-mers lost"); assert_eq!(got.len(), expected.len(), "cycle k-mers lost");
@@ -269,8 +264,8 @@ fn branching_graph_no_kmer_lost_or_duplicated() {
insert(b"GTGGCTACCGT"); // H-M-N continuation insert(b"GTGGCTACCGT"); // H-M-N continuation
insert(b"TTCGTGGCTA"); // J-I-F (different J prefix) insert(b"TTCGTGGCTA"); // J-I-F (different J prefix)
g.compute_degrees(); g.compute_degrees_and_mark_starts();
let unitigs: Vec<Unitig> = g.iter_unitig().collect(); let unitigs: Vec<Unitig> = collect_unitigs(&g);
// Collect all k-mers from unitigs. // Collect all k-mers from unitigs.
let got = kmers_from_unitigs(&unitigs); let got = kmers_from_unitigs(&unitigs);
+26 -31
View File
@@ -28,9 +28,9 @@ pub fn run(args: UnitigArgs) {
std::process::exit(1); std::process::exit(1);
}); });
let k = idx.kmer_size(); let k = idx.kmer_size();
let n = idx.n_partitions(); let n = idx.n_partitions();
let n_genomes = idx.meta().genomes.len().max(1); let n_genomes = idx.meta().genomes.len().max(1);
let use_counts = idx.meta().config.with_counts; let use_counts = idx.meta().config.with_counts;
info!("unitig: building de Bruijn graph from {n} partition(s) (k={k})"); info!("unitig: building de Bruijn graph from {n} partition(s) (k={k})");
@@ -44,22 +44,22 @@ pub fn run(args: UnitigArgs) {
let stage = Stage::start("build graph"); let stage = Stage::start("build graph");
let g = (0..n) let g = (0..n)
.into_par_iter() .into_par_iter()
.fold( .fold(GraphDeBruijn::new, |mut local_g, i| {
GraphDeBruijn::new, partition
|mut local_g, i| { .iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| {
partition local_g.push(kmer);
.iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| { })
local_g.push(kmer); .unwrap_or_else(|e| {
}) eprintln!("error reading partition {i}: {e}");
.unwrap_or_else(|e| { std::process::exit(1);
eprintln!("error reading partition {i}: {e}"); });
std::process::exit(1); pb.inc(1);
}); local_g
pb.inc(1); })
local_g .reduce(GraphDeBruijn::new, |mut a, b| {
}, a.merge(b);
) a
.reduce(GraphDeBruijn::new, |mut a, b| { a.merge(b); a }); });
pb.finish_and_clear(); pb.finish_and_clear();
rep.push(stage.stop()); rep.push(stage.stop());
@@ -67,15 +67,17 @@ pub fn run(args: UnitigArgs) {
// ── Phase 2 : compute degrees (in-memory, no progress needed) ──────────── // ── Phase 2 : compute degrees (in-memory, no progress needed) ────────────
let stage = Stage::start("compute degrees"); let stage = Stage::start("compute degrees");
g.compute_degrees(); g.compute_degrees_and_mark_starts();
rep.push(stage.stop()); rep.push(stage.stop());
// ── Phase 3 : enumerate unitigs and write as FASTA ─────────────────────── // ── Phase 3 : enumerate unitigs and write as FASTA ───────────────────────
let pb = spinner("unitig"); let pb = spinner("unitig");
let out = Mutex::new(BufWriter::new(io::stdout())); let out = Mutex::new(BufWriter::new(io::stdout()));
let j = AtomicUsize::new(0); let j = AtomicUsize::new(0);
let write_unitig_fn = |unitig: obikseq::unitig::Unitig| { let stage = Stage::start("enumerate unitigs");
g.for_each_unitig(|nuc_iter| {
let unitig: obikseq::unitig::Unitig = nuc_iter.collect();
let idx = j.fetch_add(1, Ordering::Relaxed); let idx = j.fetch_add(1, Ordering::Relaxed);
let mut w = out.lock().unwrap(); let mut w = out.lock().unwrap();
write_unitig(&unitig, k, 0, idx, &mut *w).unwrap_or_else(|e| { write_unitig(&unitig, k, 0, idx, &mut *w).unwrap_or_else(|e| {
@@ -85,14 +87,7 @@ pub fn run(args: UnitigArgs) {
if idx % 10_000 == 0 { if idx % 10_000 == 0 {
pb.set_message(format!("{idx} unitigs written")); pb.set_message(format!("{idx} unitigs written"));
} }
}; });
let stage = Stage::start("chain unitigs");
g.par_for_each_chain_unitig(&write_unitig_fn);
rep.push(stage.stop());
let stage = Stage::start("remaining unitigs");
g.for_each_remaining_unitig(&write_unitig_fn);
pb.finish_and_clear(); pb.finish_and_clear();
rep.push(stage.stop()); rep.push(stage.stop());
+23 -11
View File
@@ -5,8 +5,8 @@ use cacheline_ef::{CachelineEf, CachelineEfVec};
use epserde::prelude::*; use epserde::prelude::*;
use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec};
use obidebruinj::GraphDeBruijn; use obidebruinj::GraphDeBruijn;
use obilayeredmap::{IndexMode, OLMError, layer::Layer};
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, OLMError, layer::Layer};
use obiskio::{SKError, SKFileMeta, SKFileReader}; use obiskio::{SKError, SKFileMeta, SKFileReader};
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
@@ -17,7 +17,10 @@ type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u
fn olm_to_sk(e: OLMError) -> SKError { fn olm_to_sk(e: OLMError) -> SKError {
match e { match e {
OLMError::Io(io_err) => SKError::Io(io_err), OLMError::Io(io_err) => SKError::Io(io_err),
other => SKError::InvalidData { context: "layer build", detail: other.to_string() }, other => SKError::InvalidData {
context: "layer build",
detail: other.to_string(),
},
} }
} }
@@ -99,30 +102,39 @@ impl KmerPartition {
} }
let n_kmers = g.len(); let n_kmers = g.len();
g.compute_degrees(); g.compute_degrees_and_mark_starts();
fs::create_dir_all(&layer_dir)?; fs::create_dir_all(&layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?; let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?;
for unitig in g.iter_unitig() { g.try_for_each_unitig(|nuc_iter| {
uw.write(&unitig)?; let unitig: obikseq::unitig::Unitig = nuc_iter.collect();
} uw.write(&unitig)
})?;
uw.close()?; uw.close()?;
if with_counts { if with_counts {
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, block_bits, mode, |kmer| { Layer::<PersistentCompactIntMatrix>::build(
match (&mphf1_opt, &counts1_opt) { &layer_dir,
block_bits,
mode,
|kmer| match (&mphf1_opt, &counts1_opt) {
(Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())), (Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())),
_ => 1, _ => 1,
} },
}) )
.map_err(olm_to_sk)?; .map_err(olm_to_sk)?;
} else { } else {
Layer::<()>::build(&layer_dir, block_bits, mode).map_err(olm_to_sk)?; Layer::<()>::build(&layer_dir, block_bits, mode).map_err(olm_to_sk)?;
} }
let index_dir = layer_dir.parent().expect("layer_dir has a parent"); let index_dir = layer_dir.parent().expect("layer_dir has a parent");
PartitionMeta { n_layers: 1, mode: mode.clone() }.save(index_dir).map_err(olm_to_sk)?; PartitionMeta {
n_layers: 1,
mode: mode.clone(),
}
.save(index_dir)
.map_err(olm_to_sk)?;
Ok(n_kmers) Ok(n_kmers)
} }
+102 -59
View File
@@ -2,22 +2,25 @@ use std::fs;
use std::io; use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder,
};
use obidebruinj::GraphDeBruijn; use obidebruinj::GraphDeBruijn;
use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder,
PersistentBitVecBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
PersistentCompactIntVecBuilder};
use obikseq::CanonicalKmer; use obikseq::CanonicalKmer;
use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError};
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError};
use obiskio::{SKError, SKResult, UnitigFileReader};
use crate::partition::KmerPartition; use crate::partition::KmerPartition;
// ── MergeMode ───────────────────────────────────────────────────────────────── // ── MergeMode ─────────────────────────────────────────────────────────────────
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MergeMode { Presence, Count } pub enum MergeMode {
Presence,
Count,
}
// ── ColBuilder — enum dispatch to avoid trait-object boxing issues ───────────── // ── ColBuilder — enum dispatch to avoid trait-object boxing issues ─────────────
@@ -36,8 +39,8 @@ impl ColBuilder {
fn close(self) -> SKResult<()> { fn close(self) -> SKResult<()> {
match self { match self {
ColBuilder::Bit(b) => b.close().map_err(SKError::Io), ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
ColBuilder::Int(b) => b.close().map_err(SKError::Io), ColBuilder::Int(b) => b.close().map_err(SKError::Io),
} }
} }
} }
@@ -56,12 +59,12 @@ impl SrcLayerData {
MergeMode::Presence => { MergeMode::Presence => {
if counts_dir.exists() && !layer_dir.join("presence").exists() { if counts_dir.exists() && !layer_dir.join("presence").exists() {
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?;
let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Count(mphf, mat)) Ok(SrcLayerData::Count(mphf, mat))
} else { } else {
// presence dir exists, or neither exists → Implicit handled by open() // presence dir exists, or neither exists → Implicit handled by open()
let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?;
let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Presence(mphf, mat)) Ok(SrcLayerData::Presence(mphf, mat))
} }
} }
@@ -86,7 +89,7 @@ impl SrcLayerData {
let mut buf = vec![0u32; n_genomes]; let mut buf = vec![0u32; n_genomes];
match self { match self {
SrcLayerData::Presence(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf), SrcLayerData::Presence(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf),
SrcLayerData::Count(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf), SrcLayerData::Count(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf),
} }
buf buf
} }
@@ -101,10 +104,16 @@ const INDEX_SUBDIR: &str = "index";
fn load_meta(dir: &Path) -> SKResult<PartitionMeta> { fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
match PartitionMeta::load(dir) { match PartitionMeta::load(dir) {
Ok(m) => Ok(m), Ok(m) => Ok(m),
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => { Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
{
let mut n = 0usize; let mut n = 0usize;
while dir.join(format!("layer_{n}")).exists() { n += 1; } while dir.join(format!("layer_{n}")).exists() {
let m = PartitionMeta { n_layers: n, mode: IndexMode::default() }; n += 1;
}
let m = PartitionMeta {
n_layers: n,
mode: IndexMode::default(),
};
m.save(dir).map_err(olm_to_sk)?; m.save(dir).map_err(olm_to_sk)?;
Ok(m) Ok(m)
} }
@@ -115,7 +124,10 @@ fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
fn olm_to_sk(e: OLMError) -> SKError { fn olm_to_sk(e: OLMError) -> SKError {
match e { match e {
OLMError::Io(e) => SKError::Io(e), OLMError::Io(e) => SKError::Io(e),
other => SKError::InvalidData { context: "merge", detail: other.to_string() }, other => SKError::InvalidData {
context: "merge",
detail: other.to_string(),
},
} }
} }
@@ -128,7 +140,10 @@ fn col_path_int(dir: &Path, col: usize) -> PathBuf {
} }
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
fs::write(dir.join("meta.json"), format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n")) fs::write(
dir.join("meta.json"),
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
)
} }
// ── KmerPartition::merge_partition ──────────────────────────────────────────── // ── KmerPartition::merge_partition ────────────────────────────────────────────
@@ -157,7 +172,7 @@ impl KmerPartition {
return Ok(()); return Ok(());
} }
load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open
let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?; let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?;
let n_dst_layers = dst_map.n_layers(); let n_dst_layers = dst_map.n_layers();
let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum(); let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum();
@@ -178,12 +193,13 @@ impl KmerPartition {
for (src, _) in sources.iter() { for (src, _) in sources.iter() {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
if !src_index_dir.exists() { continue; } if !src_index_dir.exists() {
continue;
}
let src_meta = load_meta(&src_index_dir)?; let src_meta = load_meta(&src_index_dir)?;
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let unitigs_path = src_index_dir let unitigs_path = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin");
.join(format!("layer_{l}")).join("unitigs.bin");
let reader = UnitigFileReader::open_sequential(&unitigs_path)?; let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if dst_map.query(kmer).is_none() { if dst_map.query(kmer).is_none() {
@@ -199,12 +215,13 @@ impl KmerPartition {
let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}"));
let n_new = if any_new { let n_new = if any_new {
g.compute_degrees(); g.compute_degrees_and_mark_starts();
fs::create_dir_all(&new_layer_dir)?; fs::create_dir_all(&new_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?;
for unitig in g.iter_unitig() { g.try_for_each_unitig(|nuc_iter| {
uw.write(&unitig)?; let unitig: obikseq::unitig::Unitig = nuc_iter.collect();
} uw.write(&unitig)
})?;
uw.close()?; uw.close()?;
Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
g.len() g.len()
@@ -225,35 +242,47 @@ impl KmerPartition {
let mut new_src_builders: Vec<ColBuilder> = if any_new { let mut new_src_builders: Vec<ColBuilder> = if any_new {
let data_dir = match mode { let data_dir = match mode {
MergeMode::Presence => new_layer_dir.join("presence"), MergeMode::Presence => new_layer_dir.join("presence"),
MergeMode::Count => new_layer_dir.join("counts"), MergeMode::Count => new_layer_dir.join("counts"),
}; };
fs::create_dir_all(&data_dir)?; fs::create_dir_all(&data_dir)?;
match mode { match mode {
MergeMode::Presence => { MergeMode::Presence => {
PersistentBitMatrixBuilder::new(n_new, &data_dir) PersistentBitMatrixBuilder::new(n_new, &data_dir)
.map_err(SKError::Io)?.close().map_err(SKError::Io)?; .map_err(SKError::Io)?
.close()
.map_err(SKError::Io)?;
for _ in 0..n_dst_genomes { for _ in 0..n_dst_genomes {
PersistentBitMatrix::append_column(&data_dir, |_| false) PersistentBitMatrix::append_column(&data_dir, |_| false)
.map_err(SKError::Io)?; .map_err(SKError::Io)?;
} }
(0..n_src_total).map(|g| -> SKResult<ColBuilder> { (0..n_src_total)
let b = PersistentBitVecBuilder::new( .map(|g| -> SKResult<ColBuilder> {
n_new, &col_path_bit(&data_dir, n_dst_genomes + g))?; let b = PersistentBitVecBuilder::new(
Ok(ColBuilder::Bit(b)) n_new,
}).collect::<SKResult<_>>()? &col_path_bit(&data_dir, n_dst_genomes + g),
)?;
Ok(ColBuilder::Bit(b))
})
.collect::<SKResult<_>>()?
} }
MergeMode::Count => { MergeMode::Count => {
PersistentCompactIntMatrixBuilder::new(n_new, &data_dir) PersistentCompactIntMatrixBuilder::new(n_new, &data_dir)
.map_err(SKError::Io)?.close().map_err(SKError::Io)?; .map_err(SKError::Io)?
.close()
.map_err(SKError::Io)?;
for _ in 0..n_dst_genomes { for _ in 0..n_dst_genomes {
PersistentCompactIntMatrix::append_column(&data_dir, |_| 0) PersistentCompactIntMatrix::append_column(&data_dir, |_| 0)
.map_err(SKError::Io)?; .map_err(SKError::Io)?;
} }
(0..n_src_total).map(|g| -> SKResult<ColBuilder> { (0..n_src_total)
let b = PersistentCompactIntVecBuilder::new( .map(|g| -> SKResult<ColBuilder> {
n_new, &col_path_int(&data_dir, n_dst_genomes + g))?; let b = PersistentCompactIntVecBuilder::new(
Ok(ColBuilder::Int(b)) n_new,
}).collect::<SKResult<_>>()? &col_path_int(&data_dir, n_dst_genomes + g),
)?;
Ok(ColBuilder::Int(b))
})
.collect::<SKResult<_>>()?
} }
} }
} else { } else {
@@ -266,22 +295,28 @@ impl KmerPartition {
.map(|l| { .map(|l| {
let layer_dir = dst_index_dir.join(format!("layer_{l}")); let layer_dir = dst_index_dir.join(format!("layer_{l}"));
let n = dst_map.layer(l).n(); let n = dst_map.layer(l).n();
(0..n_src_total).map(|src_g| -> SKResult<ColBuilder> { (0..n_src_total)
match mode { .map(|src_g| -> SKResult<ColBuilder> {
MergeMode::Presence => { match mode {
let data_dir = layer_dir.join("presence"); MergeMode::Presence => {
let b = PersistentBitVecBuilder::new( let data_dir = layer_dir.join("presence");
n, &col_path_bit(&data_dir, n_dst_genomes + src_g))?; let b = PersistentBitVecBuilder::new(
Ok(ColBuilder::Bit(b)) n,
&col_path_bit(&data_dir, n_dst_genomes + src_g),
)?;
Ok(ColBuilder::Bit(b))
}
MergeMode::Count => {
let data_dir = layer_dir.join("counts");
let b = PersistentCompactIntVecBuilder::new(
n,
&col_path_int(&data_dir, n_dst_genomes + src_g),
)?;
Ok(ColBuilder::Int(b))
}
} }
MergeMode::Count => { })
let data_dir = layer_dir.join("counts"); .collect::<SKResult<_>>()
let b = PersistentCompactIntVecBuilder::new(
n, &col_path_int(&data_dir, n_dst_genomes + src_g))?;
Ok(ColBuilder::Int(b))
}
}
}).collect::<SKResult<_>>()
}) })
.collect::<SKResult<_>>()?; .collect::<SKResult<_>>()?;
@@ -289,7 +324,10 @@ impl KmerPartition {
let mut col_offset = 0usize; let mut col_offset = 0usize;
for (src, src_n) in sources.iter() { for (src, src_n) in sources.iter() {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
if !src_index_dir.exists() { col_offset += src_n; continue; } if !src_index_dir.exists() {
col_offset += src_n;
continue;
}
let src_meta = load_meta(&src_index_dir)?; let src_meta = load_meta(&src_index_dir)?;
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
@@ -316,22 +354,27 @@ impl KmerPartition {
// ── Close builders and update metadata ──────────────────────────────── // ── Close builders and update metadata ────────────────────────────────
for (l, builders) in exist_builders.into_iter().enumerate() { for (l, builders) in exist_builders.into_iter().enumerate() {
let layer_dir = dst_index_dir.join(format!("layer_{l}")); let layer_dir = dst_index_dir.join(format!("layer_{l}"));
for b in builders { b.close()?; } for b in builders {
b.close()?;
}
let n = dst_map.layer(l).n(); let n = dst_map.layer(l).n();
let data_dir = match mode { let data_dir = match mode {
MergeMode::Presence => layer_dir.join("presence"), MergeMode::Presence => layer_dir.join("presence"),
MergeMode::Count => layer_dir.join("counts"), MergeMode::Count => layer_dir.join("counts"),
}; };
write_matrix_meta(&data_dir, n, n_dst_genomes + n_src_total).map_err(SKError::Io)?; write_matrix_meta(&data_dir, n, n_dst_genomes + n_src_total).map_err(SKError::Io)?;
} }
for b in new_src_builders { b.close()?; } for b in new_src_builders {
b.close()?;
}
if any_new { if any_new {
let data_dir = match mode { let data_dir = match mode {
MergeMode::Presence => new_layer_dir.join("presence"), MergeMode::Presence => new_layer_dir.join("presence"),
MergeMode::Count => new_layer_dir.join("counts"), MergeMode::Count => new_layer_dir.join("counts"),
}; };
write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total).map_err(SKError::Io)?; write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total)
.map_err(SKError::Io)?;
let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?; let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?;
part_meta.n_layers = new_layer_idx + 1; part_meta.n_layers = new_layer_idx + 1;
+66 -33
View File
@@ -2,15 +2,15 @@ use std::fs;
use std::io; use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use obicompactvec::{PersistentBitMatrixBuilder, use obicompactvec::{
PersistentBitVecBuilder, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder,
PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder,
PersistentCompactIntVecBuilder}; };
use obidebruinj::GraphDeBruijn; use obidebruinj::GraphDeBruijn;
use obikseq::CanonicalKmer; use obikseq::CanonicalKmer;
use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError};
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError};
use obiskio::{SKError, SKResult, UnitigFileReader};
use crate::filter::{KmerFilter, passes_all}; use crate::filter::{KmerFilter, passes_all};
use crate::merge_layer::{MergeMode, SrcLayerData}; use crate::merge_layer::{MergeMode, SrcLayerData};
@@ -21,7 +21,10 @@ const INDEX_SUBDIR: &str = "index";
fn olm_to_sk(e: OLMError) -> SKError { fn olm_to_sk(e: OLMError) -> SKError {
match e { match e {
OLMError::Io(e) => SKError::Io(e), OLMError::Io(e) => SKError::Io(e),
other => SKError::InvalidData { context: "rebuild", detail: other.to_string() }, other => SKError::InvalidData {
context: "rebuild",
detail: other.to_string(),
},
} }
} }
@@ -34,7 +37,10 @@ fn col_path_int(dir: &Path, col: usize) -> PathBuf {
} }
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
fs::write(dir.join("meta.json"), format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n")) fs::write(
dir.join("meta.json"),
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
)
} }
// ── ColBuilder ──────────────────────────────────────────────────────────────── // ── ColBuilder ────────────────────────────────────────────────────────────────
@@ -54,8 +60,8 @@ impl ColBuilder {
fn close(self) -> SKResult<()> { fn close(self) -> SKResult<()> {
match self { match self {
ColBuilder::Bit(b) => b.close().map_err(SKError::Io), ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
ColBuilder::Int(b) => b.close().map_err(SKError::Io), ColBuilder::Int(b) => b.close().map_err(SKError::Io),
} }
} }
} }
@@ -65,10 +71,16 @@ impl ColBuilder {
fn load_meta(dir: &Path) -> SKResult<PartitionMeta> { fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
match PartitionMeta::load(dir) { match PartitionMeta::load(dir) {
Ok(m) => Ok(m), Ok(m) => Ok(m),
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => { Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) =>
{
let mut n = 0usize; let mut n = 0usize;
while dir.join(format!("layer_{n}")).exists() { n += 1; } while dir.join(format!("layer_{n}")).exists() {
let m = PartitionMeta { n_layers: n, mode: IndexMode::default() }; n += 1;
}
let m = PartitionMeta {
n_layers: n,
mode: IndexMode::default(),
};
m.save(dir).map_err(olm_to_sk)?; m.save(dir).map_err(olm_to_sk)?;
Ok(m) Ok(m)
} }
@@ -90,10 +102,12 @@ fn iter_src_layers(
let src_meta = load_meta(src_index_dir)?; let src_meta = load_meta(src_index_dir)?;
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
let unitigs_path = src_layer_dir.join("unitigs.bin"); let unitigs_path = src_layer_dir.join("unitigs.bin");
if !unitigs_path.exists() { continue; } if !unitigs_path.exists() {
continue;
}
let reader = UnitigFileReader::open_sequential(&unitigs_path)?; let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
let src_data = SrcLayerData::open(&src_layer_dir, mode)?; let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
@@ -146,7 +160,7 @@ impl KmerPartition {
} }
let n_new = g.len(); let n_new = g.len();
g.compute_degrees(); g.compute_degrees_and_mark_starts();
// ── Build MPHF in dst layer_0 ───────────────────────────────────────── // ── Build MPHF in dst layer_0 ─────────────────────────────────────────
let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR);
@@ -154,9 +168,10 @@ impl KmerPartition {
fs::create_dir_all(&dst_layer_dir)?; fs::create_dir_all(&dst_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?;
for unitig in g.iter_unitig() { g.try_for_each_unitig(|nuc_iter| {
uw.write(&unitig)?; let unitig: obikseq::unitig::Unitig = nuc_iter.collect();
} uw.write(&unitig)
})?;
uw.close()?; uw.close()?;
drop(g); drop(g);
@@ -166,26 +181,37 @@ impl KmerPartition {
// ── Prepare matrix builders (one column per genome) ─────────────────── // ── Prepare matrix builders (one column per genome) ───────────────────
let data_dir = match mode { let data_dir = match mode {
MergeMode::Presence => dst_layer_dir.join("presence"), MergeMode::Presence => dst_layer_dir.join("presence"),
MergeMode::Count => dst_layer_dir.join("counts"), MergeMode::Count => dst_layer_dir.join("counts"),
}; };
fs::create_dir_all(&data_dir)?; fs::create_dir_all(&data_dir)?;
let mut builders: Vec<ColBuilder> = match mode { let mut builders: Vec<ColBuilder> = match mode {
MergeMode::Presence => { MergeMode::Presence => {
PersistentBitMatrixBuilder::new(n_new, &data_dir) PersistentBitMatrixBuilder::new(n_new, &data_dir)
.map_err(SKError::Io)?.close().map_err(SKError::Io)?; .map_err(SKError::Io)?
(0..n_genomes).map(|g| -> SKResult<ColBuilder> { .close()
let b = PersistentBitVecBuilder::new(n_new, &col_path_bit(&data_dir, g))?; .map_err(SKError::Io)?;
Ok(ColBuilder::Bit(b)) (0..n_genomes)
}).collect::<SKResult<_>>()? .map(|g| -> SKResult<ColBuilder> {
let b = PersistentBitVecBuilder::new(n_new, &col_path_bit(&data_dir, g))?;
Ok(ColBuilder::Bit(b))
})
.collect::<SKResult<_>>()?
} }
MergeMode::Count => { MergeMode::Count => {
PersistentCompactIntMatrixBuilder::new(n_new, &data_dir) PersistentCompactIntMatrixBuilder::new(n_new, &data_dir)
.map_err(SKError::Io)?.close().map_err(SKError::Io)?; .map_err(SKError::Io)?
(0..n_genomes).map(|g| -> SKResult<ColBuilder> { .close()
let b = PersistentCompactIntVecBuilder::new(n_new, &col_path_int(&data_dir, g))?; .map_err(SKError::Io)?;
Ok(ColBuilder::Int(b)) (0..n_genomes)
}).collect::<SKResult<_>>()? .map(|g| -> SKResult<ColBuilder> {
let b = PersistentCompactIntVecBuilder::new(
n_new,
&col_path_int(&data_dir, g),
)?;
Ok(ColBuilder::Int(b))
})
.collect::<SKResult<_>>()?
} }
}; };
@@ -199,10 +225,17 @@ impl KmerPartition {
})?; })?;
// ── Close builders, write metadata ──────────────────────────────────── // ── Close builders, write metadata ────────────────────────────────────
for b in builders { b.close()?; } for b in builders {
b.close()?;
}
write_matrix_meta(&data_dir, n_new, n_genomes).map_err(SKError::Io)?; write_matrix_meta(&data_dir, n_new, n_genomes).map_err(SKError::Io)?;
PartitionMeta { n_layers: 1, mode: IndexMode::Exact }.save(&dst_index_dir).map_err(olm_to_sk)?; PartitionMeta {
n_layers: 1,
mode: IndexMode::Exact,
}
.save(&dst_index_dir)
.map_err(olm_to_sk)?;
Ok(()) Ok(())
} }
+23
View File
@@ -115,6 +115,29 @@ impl PackedSeq {
} }
Self::new(count_to_tail(seql), seq.into_boxed_slice()) Self::new(count_to_tail(seql), seq.into_boxed_slice())
} }
}
impl FromIterator<u8> for PackedSeq {
/// Build from a stream of 2-bit nucleotide values (0=A…3=T).
/// Packs on the fly — one allocation, no intermediate Vec.
fn from_iter<I: IntoIterator<Item = u8>>(iter: I) -> Self {
let iter = iter.into_iter();
let (lower, _) = iter.size_hint();
let mut seq: Vec<u8> = Vec::with_capacity(byte_len(lower.max(1)));
let mut seql = 0usize;
for nuc in iter {
if seql % 4 == 0 {
seq.push(0);
}
*seq.last_mut().unwrap() |= (nuc & 0b11) << (6 - 2 * (seql % 4));
seql += 1;
}
debug_assert!(seql >= 1, "PackedSeq requires at least one nucleotide");
Self::new(count_to_tail(seql), seq.into_boxed_slice())
}
}
impl PackedSeq {
/// Write ASCII nucleotides into `writer`. Zero allocation. /// Write ASCII nucleotides into `writer`. Zero allocation.
pub fn write_ascii<W: Write>(&self, writer: &mut W) -> io::Result<()> { pub fn write_ascii<W: Write>(&self, writer: &mut W) -> io::Result<()> {