diff --git a/Betula_exilis--IGA-24-33/partition.meta b/Betula_exilis--IGA-24-33/partition.meta index 2a61c0b..89a2c1a 100644 --- a/Betula_exilis--IGA-24-33/partition.meta +++ b/Betula_exilis--IGA-24-33/partition.meta @@ -2,6 +2,5 @@ "n_bits": 8, "kmer_size": 31, "minimizer_size": 11, - "format": "zstd", "level": 3 } \ No newline at end of file diff --git a/src/obikmer/src/cli.rs b/src/obikmer/src/cli.rs index 8876b4d..f922b15 100644 --- a/src/obikmer/src/cli.rs +++ b/src/obikmer/src/cli.rs @@ -44,6 +44,13 @@ pub struct CommonArgs { pub threads: usize, } +impl CommonArgs { + pub fn seqfile_paths(&self) -> obiread::PathIter { + let paths = self.inputs.iter().map(PathBuf::from).collect(); + obiread::PathIter::new(paths) + } +} + // ── Pipeline data carrier ───────────────────────────────────────────────────── pub enum PipelineData { diff --git a/src/obikmer/src/cmd/partition.rs b/src/obikmer/src/cmd/partition.rs index 5a8de05..29ccbe3 100644 --- a/src/obikmer/src/cmd/partition.rs +++ b/src/obikmer/src/cmd/partition.rs @@ -1,10 +1,8 @@ +use std::path::PathBuf; + use clap::Args; use obikpartitionner::KmerPartition; use obikseq::superkmer::SuperKmer; -use obipipeline::{WorkerPool, make_pipeline}; -use std::io; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; use tracing::info; use crate::cli::{CommonArgs, PipelineData, open_chunks}; @@ -23,15 +21,6 @@ pub struct PartitionArgs { pub common: CommonArgs, } -// ── Stage functions ─────────────────────────────────────────────────────────── - -fn write_batch(mut batch: Vec, kp: &Mutex) -> io::Result<()> { - kp.lock() - .unwrap() - .write_batch(&mut batch) - .map_err(io::Error::other) -} - // ── Entry point ─────────────────────────────────────────────────────────────── pub fn run(args: PartitionArgs) { @@ -41,30 +30,30 @@ pub fn run(args: PartitionArgs) { let level_max = args.common.level_max; let n_workers = args.common.threads.max(1); - let kp = KmerPartition::create(&args.output, args.common.partition_bits, k, m, args.force) + let mut kp = KmerPartition::create(&args.output, args.common.partition_bits, k, m, args.force) .unwrap_or_else(|e| { eprintln!("error: {e}"); std::process::exit(1) }); - let kp = Arc::new(Mutex::new(kp)); - let kp_sink = Arc::clone(&kp); - let paths = args.common.inputs.iter().map(PathBuf::from).collect(); - let path_source = obiread::PathIter::new(paths); + let path_source = args.common.seqfile_paths(); - let pipeline = make_pipeline! { - PipelineData, - source path_source => Path, - ||? open_chunks : Path => RawChunk, - |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, + let pipe = obipipeline::make_pipe! { + PipelineData : PathBuf => Vec, + ||? { |path| open_chunks(path) } : Path => RawChunk, + |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, | { move |rope| obiskbuilder::build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch, - sink? { move |batch| write_batch(batch, &kp_sink) } @ Batch, }; - WorkerPool::new(pipeline, n_workers, 1).run(); - kp.lock().unwrap().close().expect("close error"); + for mut batch in pipe.apply(path_source, n_workers, 1) { + kp.write_batch(&mut batch).unwrap_or_else(|e| { + eprintln!("error: {e}"); + std::process::exit(1) + }); + } + kp.close().expect("close error"); info!("dereplicating..."); - kp.lock().unwrap().dereplicate().expect("dereplicate error"); - kp.lock().unwrap().count_kmer().expect("count kmer error"); + kp.dereplicate().expect("dereplicate error"); + kp.count_kmer().expect("count kmer error"); } diff --git a/src/obikmer/src/cmd/superkmer.rs b/src/obikmer/src/cmd/superkmer.rs index 88db465..cb5f8ca 100644 --- a/src/obikmer/src/cmd/superkmer.rs +++ b/src/obikmer/src/cmd/superkmer.rs @@ -1,11 +1,9 @@ use std::io::{self, BufWriter, Write}; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; use clap::Args; use obifastwrite::write_scatter; use obikseq::superkmer::SuperKmer; -use obipipeline::{WorkerPool, make_pipeline}; use crate::cli::{CommonArgs, PipelineData, open_chunks}; @@ -19,12 +17,11 @@ pub struct SuperkmerArgs { fn write_batch( batch: Vec, - out: &Mutex>, + out: &mut BufWriter, partition_bits: usize, k: usize, m: usize, ) -> io::Result<()> { - let mut w = out.lock().unwrap(); let partition_mask = (1u64 << partition_bits) - 1; for sk in batch { let minimizer = sk @@ -32,7 +29,7 @@ fn write_batch( .map_err(io::Error::other)? .canonical(m); let partition = (minimizer.hash(m) & partition_mask) as usize; - write_scatter(&sk, &mut *w, k, m, partition, minimizer)?; + write_scatter(&sk, out, k, m, partition, minimizer)?; } Ok(()) } @@ -47,21 +44,18 @@ pub fn run(args: SuperkmerArgs) { let partition_bits = args.common.partition_bits; let n_workers = args.common.threads.max(1); - let paths = args.common.inputs.iter().map(PathBuf::from).collect(); - let path_source = obiread::PathIter::new(paths); + let path_source = args.common.seqfile_paths(); - let out = Arc::new(Mutex::new(BufWriter::new(io::stdout()))); - let out_sink = Arc::clone(&out); - - let pipeline = make_pipeline! { - PipelineData, - source path_source => Path, - ||? open_chunks : Path => RawChunk, - |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, + let pipe = obipipeline::make_pipe! { + PipelineData : PathBuf => Vec, + ||? { |path| open_chunks(path) } : Path => RawChunk, + |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, | { move |rope| obiskbuilder::build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch, - sink? { move |batch| write_batch(batch, &out_sink, partition_bits, k, m) } @ Batch, }; - WorkerPool::new(pipeline, n_workers, 1).run(); - out.lock().unwrap().flush().expect("flush error"); + let mut out = BufWriter::new(io::stdout()); + for batch in pipe.apply(path_source, n_workers, 1) { + write_batch(batch, &mut out, partition_bits, k, m).expect("write error"); + } + out.flush().expect("flush error"); } diff --git a/src/obikpartitionner/src/partition.rs b/src/obikpartitionner/src/partition.rs index 52783f4..b2c2e5d 100644 --- a/src/obikpartitionner/src/partition.rs +++ b/src/obikpartitionner/src/partition.rs @@ -186,7 +186,13 @@ impl KmerPartition { pub fn dereplicate(&self) -> SKResult<()> { let level = self.level; let root = &self.root_path; - let available = System::new_all().available_memory(); + let sys = System::new_all(); + // available_memory() can return 0 on macOS when the compressor page count exceeds + // free+inactive+purgeable pages (sysinfo saturating_sub). Fall back to half of total. + let available = match sys.available_memory() { + 0 => sys.total_memory() / 2, + n => n, + }; let n_threads = rayon::current_num_threads().max(1) as u64; let available_per_thread = available / n_threads; diff --git a/src/obikseq/src/kmer.rs b/src/obikseq/src/kmer.rs index ad24534..c1d8210 100644 --- a/src/obikseq/src/kmer.rs +++ b/src/obikseq/src/kmer.rs @@ -161,6 +161,33 @@ impl Kmer { pub fn hash(&self, k: usize) -> u64 { mix64(self.canonical(k).0) } + + /// Return the left canonical neighbors of this kmer. + /// + /// Zero allocation — result lives on the stack. + pub fn left_canonical_neighbors(&self, k: usize) -> [Kmer; 4] { + let shifted = (self.0 >> 2) & (!0u64 << (64 - 2 * k)); + [ + Kmer(shifted).canonical(k), + Kmer(shifted | (1u64 << 62)).canonical(k), + Kmer(shifted | (2u64 << 62)).canonical(k), + Kmer(shifted | (3u64 << 62)).canonical(k), + ] + } + + /// Return the right canonical neighbors of this kmer. + /// + /// Zero allocation — result lives on the stack. + pub fn right_canonical_neighbors(&self, k: usize) -> [Kmer; 4] { + let shifted = self.0 << 2 & (!0u64 << (64 - 2 * (k - 1))); + let shift = 64 - 2 * k; + [ + Kmer(shifted).canonical(k), + Kmer(shifted | (1u64 << shift)).canonical(k), + Kmer(shifted | (2u64 << shift)).canonical(k), + Kmer(shifted | (3u64 << shift)).canonical(k), + ] + } } // ── tests ───────────────────────────────────────────────────────────────────── diff --git a/src/obipipeline/src/lib.rs b/src/obipipeline/src/lib.rs index 82b4999..92f8a71 100644 --- a/src/obipipeline/src/lib.rs +++ b/src/obipipeline/src/lib.rs @@ -1,5 +1,7 @@ mod scheduler; +pub use scheduler::Pipe; +pub use scheduler::PipeIter; pub use scheduler::Pipeline; pub use scheduler::PipelineError; pub use scheduler::SharedFlatFn; diff --git a/src/obipipeline/src/scheduler.rs b/src/obipipeline/src/scheduler.rs index 5cf0540..956570b 100644 --- a/src/obipipeline/src/scheduler.rs +++ b/src/obipipeline/src/scheduler.rs @@ -1,6 +1,7 @@ use crossbeam_channel::{Receiver, Select, Sender, bounded}; use std::error::Error; use std::fmt; +use std::marker::PhantomData; use std::sync::Arc; use std::thread; @@ -371,6 +372,115 @@ where } } +// ── Pipe ────────────────────────────────────────────────────────────────────── + +/// Typed, composable iterator transformer. +/// +/// A `Pipe` is a pure description of pipeline stages — no threads, +/// no channels, no scheduler. Call `.apply(iter, n_workers, capacity)` to start +/// execution and get back a `PipeIter`. +/// +/// Compose two pipes with `.then()`: the resulting `Pipe` holds the concatenated +/// stage list, so a single scheduler is created when `.apply()` is eventually called. +pub struct Pipe { + stages: Vec>, + wrap: Arc D + Send + Sync>, + unwrap: Arc Out + Send + Sync>, + _phantom: PhantomData<(In, Out)>, +} + +impl Pipe { + /// Build a `Pipe` from stages and wrap/unwrap converters. + /// Prefer the `make_pipe!` macro. + pub fn new( + stages: Vec>, + wrap: Arc D + Send + Sync>, + unwrap: Arc Out + Send + Sync>, + ) -> Self { + Self { stages, wrap, unwrap, _phantom: PhantomData } + } + + /// Concatenate stages from two pipes into one. + /// + /// Requires `Out` of `self` == `In` of `other`. The single scheduler + /// created at `.apply()` time sees the full combined stage list. + pub fn then(self, other: Pipe) -> Pipe { + Pipe { + stages: self.stages.into_iter().chain(other.stages).collect(), + wrap: self.wrap, + unwrap: other.unwrap, + _phantom: PhantomData, + } + } +} + +impl Pipe +where + D: Send + Sync + 'static, + In: Send + 'static, + Out: Send + 'static, +{ + /// Run the pipeline in a background thread; returns an iterator over the output. + pub fn apply( + self, + input: impl Iterator + Send + 'static, + n_workers: usize, + capacity: usize, + ) -> PipeIter { + let wrap = Arc::clone(&self.wrap); + let unwrap = Arc::clone(&self.unwrap); + + let mut iter = input; + let source: SourceFn = Box::new(move || match iter.next() { + Some(x) => Ok(wrap(x)), + None => Err(PipelineError::EndOfStream), + }); + + let (out_tx, out_rx) = bounded::(capacity); + let sink: SinkFn = Box::new(move |data: D| { + out_tx.send(unwrap(data)).map_err(|_| { + PipelineError::StepError(Box::new(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "output channel closed", + ))) + }) + }); + + let pipeline = Pipeline::new(source, self.stages, sink); + let handle = thread::spawn(move || { + WorkerPool::new(pipeline, n_workers, capacity).run(); + }); + + PipeIter { rx: out_rx, handle: Some(handle) } + } +} + +// ── PipeIter ────────────────────────────────────────────────────────────────── + +/// Iterator over the output of `Pipe::apply()`. +pub struct PipeIter { + rx: Receiver, + handle: Option>, +} + +impl Iterator for PipeIter { + type Item = Out; + + fn next(&mut self) -> Option { + self.rx.recv().ok() + } +} + +impl Drop for PipeIter { + fn drop(&mut self) { + // Drain buffered items so the scheduler can unblock if the channel is full. + while self.rx.try_recv().is_ok() {} + if let Some(h) = self.handle.take() { + let _ = h.join(); + } + } +} + /// Envoie `data` au stage `stage_idx`. /// Pour un `Transform`, empile une `WorkerTask::Transform`. /// Pour un `Flat`, incrémente `flat_workers_active` et empile une `WorkerTask::Flat`. @@ -684,3 +794,85 @@ macro_rules! make_pipeline { ) }; } + +/// Builds a typed `Pipe` — sourceless and sinkless. +/// +/// Syntax: +/// ```ignore +/// make_pipe! { +/// MyData : InType => OutType, +/// | func : InVariant => OutVariant, // transform 1→1 +/// |? func : InVariant => OutVariant, // transform 1→1 fallible +/// || func : InVariant => OutVariant, // flat transform 1→N +/// ||? func : InVariant => OutVariant, // flat transform 1→N fallible +/// } +/// ``` +#[macro_export] +macro_rules! make_pipe { + // ── Entry: first stage | ───────────────────────────────────────────── + ($enum:ident : $in_ty:ty => $out_ty:ty, + | $tf:tt : $fi:ident => $fo:ident, $($rest:tt)*) => { + $crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi, + [$crate::make_transform!($enum, $tf, $fi, $fo),], $fo, $($rest)*) + }; + // ── Entry: first stage |? ──────────────────────────────────────────── + ($enum:ident : $in_ty:ty => $out_ty:ty, + |? $tf:tt : $fi:ident => $fo:ident, $($rest:tt)*) => { + $crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi, + [$crate::make_transform_fallible!($enum, $tf, $fi, $fo),], $fo, $($rest)*) + }; + // ── Entry: first stage || ──────────────────────────────────────────── + ($enum:ident : $in_ty:ty => $out_ty:ty, + || $tf:tt : $fi:ident => $fo:ident, $($rest:tt)*) => { + $crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi, + [$crate::make_flat_transform!($enum, $tf, $fi, $fo),], $fo, $($rest)*) + }; + // ── Entry: first stage ||? ─────────────────────────────────────────── + ($enum:ident : $in_ty:ty => $out_ty:ty, + ||? $tf:tt : $fi:ident => $fo:ident, $($rest:tt)*) => { + $crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi, + [$crate::make_flat_transform_fallible!($enum, $tf, $fi, $fo),], $fo, $($rest)*) + }; + + // ── Accumulation: | ────────────────────────────────────────────────── + (@build $enum:ident : $in_ty:ty => $out_ty:ty, $fi:ident, + [$($acc:tt)*], $lo:ident, + | $tf:tt : $ti:ident => $to:ident, $($rest:tt)*) => { + $crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi, + [$($acc)* $crate::make_transform!($enum, $tf, $ti, $to),], $to, $($rest)*) + }; + // ── Accumulation: |? ───────────────────────────────────────────────── + (@build $enum:ident : $in_ty:ty => $out_ty:ty, $fi:ident, + [$($acc:tt)*], $lo:ident, + |? $tf:tt : $ti:ident => $to:ident, $($rest:tt)*) => { + $crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi, + [$($acc)* $crate::make_transform_fallible!($enum, $tf, $ti, $to),], $to, $($rest)*) + }; + // ── Accumulation: || ───────────────────────────────────────────────── + (@build $enum:ident : $in_ty:ty => $out_ty:ty, $fi:ident, + [$($acc:tt)*], $lo:ident, + || $tf:tt : $ti:ident => $to:ident, $($rest:tt)*) => { + $crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi, + [$($acc)* $crate::make_flat_transform!($enum, $tf, $ti, $to),], $to, $($rest)*) + }; + // ── Accumulation: ||? ──────────────────────────────────────────────── + (@build $enum:ident : $in_ty:ty => $out_ty:ty, $fi:ident, + [$($acc:tt)*], $lo:ident, + ||? $tf:tt : $ti:ident => $to:ident, $($rest:tt)*) => { + $crate::make_pipe!(@build $enum : $in_ty => $out_ty, $fi, + [$($acc)* $crate::make_flat_transform_fallible!($enum, $tf, $ti, $to),], $to, $($rest)*) + }; + + // ── Termination ─────────────────────────────────────────────────────── + (@build $enum:ident : $in_ty:ty => $out_ty:ty, $fi:ident, + [$($acc:tt)*], $lo:ident $(,)?) => { + $crate::Pipe::new( + vec![$($acc)*], + ::std::sync::Arc::new(|x: $in_ty| $enum::$fi(x)), + ::std::sync::Arc::new(|d: $enum| -> $out_ty { + if let $enum::$lo(x) = d { x } + else { ::std::unreachable!("unexpected pipeline data variant in make_pipe!") } + }), + ) + }; +}