♻️ refactor pipeline architecture and fix macOS memory detection

- Replace WorkerPool-based pipelines with typed `Pipe` abstraction in obipipeline
  - Introduce Pipe/PipeIter for composable, sourceless/sink-less pipelines
- Update partition and superkmer commands to use new Pipe API via make_pipe!
  - Remove Arc<Mutex<...>> patterns; simplify state management
- Fix macOS available_memory() returning 0 by falling back to half total memory in dereplicate()
- Remove unused `format: "zstd"` field from partition.meta
This commit is contained in:
Eric Coissac
2026-04-28 08:40:07 +02:00
parent 4c19882f03
commit 97e65bd831
8 changed files with 264 additions and 48 deletions
-1
View File
@@ -2,6 +2,5 @@
"n_bits": 8,
"kmer_size": 31,
"minimizer_size": 11,
"format": "zstd",
"level": 3
}
+7
View File
@@ -44,6 +44,13 @@ pub struct CommonArgs {
pub threads: usize,
}
impl CommonArgs {
pub fn seqfile_paths(&self) -> obiread::PathIter {
let paths = self.inputs.iter().map(PathBuf::from).collect();
obiread::PathIter::new(paths)
}
}
// ── Pipeline data carrier ─────────────────────────────────────────────────────
pub enum PipelineData {
+17 -28
View File
@@ -1,10 +1,8 @@
use std::path::PathBuf;
use clap::Args;
use obikpartitionner::KmerPartition;
use obikseq::superkmer::SuperKmer;
use obipipeline::{WorkerPool, make_pipeline};
use std::io;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tracing::info;
use crate::cli::{CommonArgs, PipelineData, open_chunks};
@@ -23,15 +21,6 @@ pub struct PartitionArgs {
pub common: CommonArgs,
}
// ── Stage functions ───────────────────────────────────────────────────────────
fn write_batch(mut batch: Vec<SuperKmer>, kp: &Mutex<KmerPartition>) -> io::Result<()> {
kp.lock()
.unwrap()
.write_batch(&mut batch)
.map_err(io::Error::other)
}
// ── Entry point ───────────────────────────────────────────────────────────────
pub fn run(args: PartitionArgs) {
@@ -41,30 +30,30 @@ pub fn run(args: PartitionArgs) {
let level_max = args.common.level_max;
let n_workers = args.common.threads.max(1);
let kp = KmerPartition::create(&args.output, args.common.partition_bits, k, m, args.force)
let mut kp = KmerPartition::create(&args.output, args.common.partition_bits, k, m, args.force)
.unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1)
});
let kp = Arc::new(Mutex::new(kp));
let kp_sink = Arc::clone(&kp);
let paths = args.common.inputs.iter().map(PathBuf::from).collect();
let path_source = obiread::PathIter::new(paths);
let path_source = args.common.seqfile_paths();
let pipeline = make_pipeline! {
PipelineData,
source path_source => Path,
||? open_chunks : Path => RawChunk,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
let pipe = obipipeline::make_pipe! {
PipelineData : PathBuf => Vec<SuperKmer>,
||? { |path| open_chunks(path) } : Path => RawChunk,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
| { move |rope| obiskbuilder::build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch,
sink? { move |batch| write_batch(batch, &kp_sink) } @ Batch,
};
WorkerPool::new(pipeline, n_workers, 1).run();
kp.lock().unwrap().close().expect("close error");
for mut batch in pipe.apply(path_source, n_workers, 1) {
kp.write_batch(&mut batch).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1)
});
}
kp.close().expect("close error");
info!("dereplicating...");
kp.lock().unwrap().dereplicate().expect("dereplicate error");
kp.lock().unwrap().count_kmer().expect("count kmer error");
kp.dereplicate().expect("dereplicate error");
kp.count_kmer().expect("count kmer error");
}
+12 -18
View File
@@ -1,11 +1,9 @@
use std::io::{self, BufWriter, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use clap::Args;
use obifastwrite::write_scatter;
use obikseq::superkmer::SuperKmer;
use obipipeline::{WorkerPool, make_pipeline};
use crate::cli::{CommonArgs, PipelineData, open_chunks};
@@ -19,12 +17,11 @@ pub struct SuperkmerArgs {
fn write_batch(
batch: Vec<SuperKmer>,
out: &Mutex<BufWriter<io::Stdout>>,
out: &mut BufWriter<io::Stdout>,
partition_bits: usize,
k: usize,
m: usize,
) -> io::Result<()> {
let mut w = out.lock().unwrap();
let partition_mask = (1u64 << partition_bits) - 1;
for sk in batch {
let minimizer = sk
@@ -32,7 +29,7 @@ fn write_batch(
.map_err(io::Error::other)?
.canonical(m);
let partition = (minimizer.hash(m) & partition_mask) as usize;
write_scatter(&sk, &mut *w, k, m, partition, minimizer)?;
write_scatter(&sk, out, k, m, partition, minimizer)?;
}
Ok(())
}
@@ -47,21 +44,18 @@ pub fn run(args: SuperkmerArgs) {
let partition_bits = args.common.partition_bits;
let n_workers = args.common.threads.max(1);
let paths = args.common.inputs.iter().map(PathBuf::from).collect();
let path_source = obiread::PathIter::new(paths);
let path_source = args.common.seqfile_paths();
let out = Arc::new(Mutex::new(BufWriter::new(io::stdout())));
let out_sink = Arc::clone(&out);
let pipeline = make_pipeline! {
PipelineData,
source path_source => Path,
||? open_chunks : Path => RawChunk,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
let pipe = obipipeline::make_pipe! {
PipelineData : PathBuf => Vec<SuperKmer>,
||? { |path| open_chunks(path) } : Path => RawChunk,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
| { move |rope| obiskbuilder::build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch,
sink? { move |batch| write_batch(batch, &out_sink, partition_bits, k, m) } @ Batch,
};
WorkerPool::new(pipeline, n_workers, 1).run();
out.lock().unwrap().flush().expect("flush error");
let mut out = BufWriter::new(io::stdout());
for batch in pipe.apply(path_source, n_workers, 1) {
write_batch(batch, &mut out, partition_bits, k, m).expect("write error");
}
out.flush().expect("flush error");
}
+7 -1
View File
@@ -186,7 +186,13 @@ impl KmerPartition {
pub fn dereplicate(&self) -> SKResult<()> {
let level = self.level;
let root = &self.root_path;
let available = System::new_all().available_memory();
let sys = System::new_all();
// available_memory() can return 0 on macOS when the compressor page count exceeds
// free+inactive+purgeable pages (sysinfo saturating_sub). Fall back to half of total.
let available = match sys.available_memory() {
0 => sys.total_memory() / 2,
n => n,
};
let n_threads = rayon::current_num_threads().max(1) as u64;
let available_per_thread = available / n_threads;
+27
View File
@@ -161,6 +161,33 @@ impl Kmer {
pub fn hash(&self, k: usize) -> u64 {
mix64(self.canonical(k).0)
}
/// Return the left canonical neighbors of this kmer.
///
/// Zero allocation — result lives on the stack.
pub fn left_canonical_neighbors(&self, k: usize) -> [Kmer; 4] {
let shifted = (self.0 >> 2) & (!0u64 << (64 - 2 * k));
[
Kmer(shifted).canonical(k),
Kmer(shifted | (1u64 << 62)).canonical(k),
Kmer(shifted | (2u64 << 62)).canonical(k),
Kmer(shifted | (3u64 << 62)).canonical(k),
]
}
/// Return the right canonical neighbors of this kmer.
///
/// Zero allocation — result lives on the stack.
pub fn right_canonical_neighbors(&self, k: usize) -> [Kmer; 4] {
let shifted = self.0 << 2 & (!0u64 << (64 - 2 * (k - 1)));
let shift = 64 - 2 * k;
[
Kmer(shifted).canonical(k),
Kmer(shifted | (1u64 << shift)).canonical(k),
Kmer(shifted | (2u64 << shift)).canonical(k),
Kmer(shifted | (3u64 << shift)).canonical(k),
]
}
}
// ── tests ─────────────────────────────────────────────────────────────────────
+2
View File
@@ -1,5 +1,7 @@
mod scheduler;
pub use scheduler::Pipe;
pub use scheduler::PipeIter;
pub use scheduler::Pipeline;
pub use scheduler::PipelineError;
pub use scheduler::SharedFlatFn;
+192
View File
@@ -1,6 +1,7 @@
use crossbeam_channel::{Receiver, Select, Sender, bounded};
use std::error::Error;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
use std::thread;
@@ -371,6 +372,115 @@ where
}
}
// ── Pipe ──────────────────────────────────────────────────────────────────────
/// Typed, composable iterator transformer.
///
/// A `Pipe<D, In, Out>` is a pure description of pipeline stages — no threads,
/// no channels, no scheduler. Call `.apply(iter, n_workers, capacity)` to start
/// execution and get back a `PipeIter<Out>`.
///
/// Compose two pipes with `.then()`: the resulting `Pipe` holds the concatenated
/// stage list, so a single scheduler is created when `.apply()` is eventually called.
pub struct Pipe<D, In, Out> {
stages: Vec<Stage<D>>,
wrap: Arc<dyn Fn(In) -> D + Send + Sync>,
unwrap: Arc<dyn Fn(D) -> Out + Send + Sync>,
_phantom: PhantomData<(In, Out)>,
}
impl<D, In, Out> Pipe<D, In, Out> {
/// Build a `Pipe` from stages and wrap/unwrap converters.
/// Prefer the `make_pipe!` macro.
pub fn new(
stages: Vec<Stage<D>>,
wrap: Arc<dyn Fn(In) -> D + Send + Sync>,
unwrap: Arc<dyn Fn(D) -> Out + Send + Sync>,
) -> Self {
Self { stages, wrap, unwrap, _phantom: PhantomData }
}
/// Concatenate stages from two pipes into one.
///
/// Requires `Out` of `self` == `In` of `other`. The single scheduler
/// created at `.apply()` time sees the full combined stage list.
pub fn then<Next>(self, other: Pipe<D, Out, Next>) -> Pipe<D, In, Next> {
Pipe {
stages: self.stages.into_iter().chain(other.stages).collect(),
wrap: self.wrap,
unwrap: other.unwrap,
_phantom: PhantomData,
}
}
}
impl<D, In, Out> Pipe<D, In, Out>
where
D: Send + Sync + 'static,
In: Send + 'static,
Out: Send + 'static,
{
/// Run the pipeline in a background thread; returns an iterator over the output.
pub fn apply(
self,
input: impl Iterator<Item = In> + Send + 'static,
n_workers: usize,
capacity: usize,
) -> PipeIter<Out> {
let wrap = Arc::clone(&self.wrap);
let unwrap = Arc::clone(&self.unwrap);
let mut iter = input;
let source: SourceFn<D> = Box::new(move || match iter.next() {
Some(x) => Ok(wrap(x)),
None => Err(PipelineError::EndOfStream),
});
let (out_tx, out_rx) = bounded::<Out>(capacity);
let sink: SinkFn<D> = Box::new(move |data: D| {
out_tx.send(unwrap(data)).map_err(|_| {
PipelineError::StepError(Box::new(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"output channel closed",
)))
})
});
let pipeline = Pipeline::new(source, self.stages, sink);
let handle = thread::spawn(move || {
WorkerPool::new(pipeline, n_workers, capacity).run();
});
PipeIter { rx: out_rx, handle: Some(handle) }
}
}
// ── PipeIter ──────────────────────────────────────────────────────────────────
/// Iterator over the output of `Pipe::apply()`.
pub struct PipeIter<Out> {
rx: Receiver<Out>,
handle: Option<thread::JoinHandle<()>>,
}
impl<Out> Iterator for PipeIter<Out> {
type Item = Out;
fn next(&mut self) -> Option<Out> {
self.rx.recv().ok()
}
}
impl<Out> Drop for PipeIter<Out> {
fn drop(&mut self) {
// Drain buffered items so the scheduler can unblock if the channel is full.
while self.rx.try_recv().is_ok() {}
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
/// Envoie `data` au stage `stage_idx`.
/// Pour un `Transform`, empile une `WorkerTask::Transform`.
/// Pour un `Flat`, incrémente `flat_workers_active` et empile une `WorkerTask::Flat`.
@@ -684,3 +794,85 @@ macro_rules! make_pipeline {
)
};
}
/// Builds a typed `Pipe<D, In, Out>` — sourceless and sinkless.
///
/// Syntax:
/// ```ignore
/// make_pipe! {
/// MyData : InType => OutType,
/// | func : InVariant => OutVariant, // transform 1→1
/// |? func : InVariant => OutVariant, // transform 1→1 fallible
/// || func : InVariant => OutVariant, // flat transform 1→N
/// ||? func : InVariant => OutVariant, // flat transform 1→N fallible
/// }
/// ```
#[macro_export]
macro_rules! make_pipe {
// ── Entry: first stage | ─────────────────────────────────────────────
($enum:ident : $in_ty:ty => $out_ty:ty,
| $tf:tt : $fi:ident => $fo:ident, $($rest:tt)*) => {
$crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi,
[$crate::make_transform!($enum, $tf, $fi, $fo),], $fo, $($rest)*)
};
// ── Entry: first stage |? ────────────────────────────────────────────
($enum:ident : $in_ty:ty => $out_ty:ty,
|? $tf:tt : $fi:ident => $fo:ident, $($rest:tt)*) => {
$crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi,
[$crate::make_transform_fallible!($enum, $tf, $fi, $fo),], $fo, $($rest)*)
};
// ── Entry: first stage || ────────────────────────────────────────────
($enum:ident : $in_ty:ty => $out_ty:ty,
|| $tf:tt : $fi:ident => $fo:ident, $($rest:tt)*) => {
$crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi,
[$crate::make_flat_transform!($enum, $tf, $fi, $fo),], $fo, $($rest)*)
};
// ── Entry: first stage ||? ───────────────────────────────────────────
($enum:ident : $in_ty:ty => $out_ty:ty,
||? $tf:tt : $fi:ident => $fo:ident, $($rest:tt)*) => {
$crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi,
[$crate::make_flat_transform_fallible!($enum, $tf, $fi, $fo),], $fo, $($rest)*)
};
// ── Accumulation: | ──────────────────────────────────────────────────
(@build $enum:ident : $in_ty:ty => $out_ty:ty, $fi:ident,
[$($acc:tt)*], $lo:ident,
| $tf:tt : $ti:ident => $to:ident, $($rest:tt)*) => {
$crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi,
[$($acc)* $crate::make_transform!($enum, $tf, $ti, $to),], $to, $($rest)*)
};
// ── Accumulation: |? ─────────────────────────────────────────────────
(@build $enum:ident : $in_ty:ty => $out_ty:ty, $fi:ident,
[$($acc:tt)*], $lo:ident,
|? $tf:tt : $ti:ident => $to:ident, $($rest:tt)*) => {
$crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi,
[$($acc)* $crate::make_transform_fallible!($enum, $tf, $ti, $to),], $to, $($rest)*)
};
// ── Accumulation: || ─────────────────────────────────────────────────
(@build $enum:ident : $in_ty:ty => $out_ty:ty, $fi:ident,
[$($acc:tt)*], $lo:ident,
|| $tf:tt : $ti:ident => $to:ident, $($rest:tt)*) => {
$crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi,
[$($acc)* $crate::make_flat_transform!($enum, $tf, $ti, $to),], $to, $($rest)*)
};
// ── Accumulation: ||? ────────────────────────────────────────────────
(@build $enum:ident : $in_ty:ty => $out_ty:ty, $fi:ident,
[$($acc:tt)*], $lo:ident,
||? $tf:tt : $ti:ident => $to:ident, $($rest:tt)*) => {
$crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi,
[$($acc)* $crate::make_flat_transform_fallible!($enum, $tf, $ti, $to),], $to, $($rest)*)
};
// ── Termination ───────────────────────────────────────────────────────
(@build $enum:ident : $in_ty:ty => $out_ty:ty, $fi:ident,
[$($acc:tt)*], $lo:ident $(,)?) => {
$crate::Pipe::new(
vec![$($acc)*],
::std::sync::Arc::new(|x: $in_ty| $enum::$fi(x)),
::std::sync::Arc::new(|d: $enum| -> $out_ty {
if let $enum::$lo(x) = d { x }
else { ::std::unreachable!("unexpected pipeline data variant in make_pipe!") }
}),
)
};
}