From 7efec54b275b4a133adb4948a0f03280521f2d04 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 27 Apr 2026 16:53:42 +0200 Subject: [PATCH] .gitignore: ignore zstandard-compressed files - Add *.zst pattern to .gitignore - Prevents tracking of zstandard-compressed archives --- Betula_exilis--IGA-24-33/partition.meta | 7 + src/Cargo.lock | 5 + src/obikmer/Cargo.toml | 3 +- src/obikmer/src/cmd/mod.rs | 1 + src/obikmer/src/cmd/partition.rs | 62 +++----- src/obikmer/src/cmd/superkmer.rs | 2 +- src/obikmer/src/main.rs | 3 + src/obikpartitionner/Cargo.toml | 4 + src/obikpartitionner/src/lib.rs | 4 +- src/obikpartitionner/src/manager.rs | 18 --- src/obikpartitionner/src/partition.rs | 196 ++++++++++++++++++++++++ src/obipipeline/src/scheduler.rs | 70 +++++---- 12 files changed, 281 insertions(+), 94 deletions(-) create mode 100644 Betula_exilis--IGA-24-33/partition.meta delete mode 100644 src/obikpartitionner/src/manager.rs create mode 100644 src/obikpartitionner/src/partition.rs diff --git a/Betula_exilis--IGA-24-33/partition.meta b/Betula_exilis--IGA-24-33/partition.meta new file mode 100644 index 0000000..2a61c0b --- /dev/null +++ b/Betula_exilis--IGA-24-33/partition.meta @@ -0,0 +1,7 @@ +{ + "n_bits": 8, + "kmer_size": 31, + "minimizer_size": 11, + "format": "zstd", + "level": 3 +} \ No newline at end of file diff --git a/src/Cargo.lock b/src/Cargo.lock index e6bf410..59b0d35 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -771,6 +771,7 @@ version = "0.1.0" dependencies = [ "clap", "obifastwrite", + "obikpartitionner", "obikrope", "obikseq", "obipipeline", @@ -782,7 +783,11 @@ dependencies = [ name = "obikpartitionner" version = "0.1.0" dependencies = [ + "niffler 3.0.0", + "obikseq", "obiskio", + "serde", + "serde_json", ] [[package]] diff --git a/src/obikmer/Cargo.toml b/src/obikmer/Cargo.toml index 75f78d8..b71562e 100644 --- a/src/obikmer/Cargo.toml +++ b/src/obikmer/Cargo.toml @@ -14,4 +14,5 @@ obiskbuilder = { path = "../obiskbuilder" } obifastwrite = { path = "../obifastwrite" } obipipeline = { path = "../obipipeline" } clap = { version = "4", features = ["derive"] } -obikrope = { path = "../obikrope" } +obikrope = { path = "../obikrope" } +obikpartitionner = { path = "../obikpartitionner" } diff --git a/src/obikmer/src/cmd/mod.rs b/src/obikmer/src/cmd/mod.rs index b1d2152..e259487 100644 --- a/src/obikmer/src/cmd/mod.rs +++ b/src/obikmer/src/cmd/mod.rs @@ -1 +1,2 @@ +pub mod partition; pub mod superkmer; diff --git a/src/obikmer/src/cmd/partition.rs b/src/obikmer/src/cmd/partition.rs index 42f2d28..c8318ad 100644 --- a/src/obikmer/src/cmd/partition.rs +++ b/src/obikmer/src/cmd/partition.rs @@ -1,16 +1,20 @@ -use std::io::{self, BufWriter, Write}; +use std::io; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use clap::Args; -use obifastwrite::write_scatter; use obikrope::Rope; use obikseq::superkmer::SuperKmer; use obipipeline::{WorkerPool, make_pipeline}; use obiskbuilder::SuperKmerIter; +use obikpartitionner::KmerPartition; #[derive(Args)] pub struct PartitionArgs { + /// Output partition directory + #[arg(short, long)] + pub output: PathBuf, + /// Input files or directories (FASTA/FASTQ, optionally gzip-compressed) #[arg(num_args = 1..)] pub inputs: Vec, @@ -35,6 +39,10 @@ pub struct PartitionArgs { #[arg(short, long, default_value_t = 8)] pub partition_bits: usize, + /// Overwrite output directory if it already exists + #[arg(long, default_value_t = false)] + pub force: bool, + /// Number of worker threads #[arg( short = 'T', @@ -59,8 +67,6 @@ unsafe impl Sync for PipelineData {} // ── Stage functions ─────────────────────────────────────────────────────────── -/// Opens a sequence file and returns an iterator over its raw Rope chunks. -/// Chunk-level I/O errors are logged and skipped. fn open_chunks(path: PathBuf) -> io::Result> { let path_str = path .to_str() @@ -75,12 +81,10 @@ fn open_chunks(path: PathBuf) -> io::Result> { })) } -/// Normalises a raw sequence chunk (FASTA or FASTQ) into a compact ACGT/NUL rope. fn normalize(rope: Rope, k: usize) -> io::Result { obiread::normalize_sequence_chunk(rope, k) } -/// Extracts all super-kmers from a normalised rope. fn build_superkmers( rope: Rope, k: usize, @@ -91,34 +95,11 @@ fn build_superkmers( SuperKmerIter::new(&rope, k, m, level_max, theta).collect() } -/// Writes a batch of super-kmers to the output sink. -fn write_batch( - batch: Vec, - out: &Mutex>, - partition_bits: usize, - k: usize, - m: usize, -) -> io::Result<()> { - let mut w = out.lock().unwrap(); - let partition_mask = (1u64 << partition_bits) - 1; - for sk in batch { - let minimizer = sk - .kmer(sk.minimizer_pos() as usize, m) - .map_err(|e| std::io::Error::other(e))? - .canonical(m); - let partition = (minimizer.hash(m) & partition_mask) as usize; - write_scatter(&sk, &mut *w, k, m, partition_bits, minimizer)?; - } - Ok(()) -} - -#[inline] -fn mix64(x: u64) -> u64 { - let x = x ^ (x >> 30); - let x = x.wrapping_mul(0xbf58476d1ce4e5b9); - let x = x ^ (x >> 27); - let x = x.wrapping_mul(0x94d049bb133111eb); - x ^ (x >> 31) +fn write_batch(batch: Vec, kp: &Mutex) -> io::Result<()> { + kp.lock() + .unwrap() + .write_batch(&batch) + .map_err(|e| io::Error::other(e)) } // ── Entry point ─────────────────────────────────────────────────────────────── @@ -128,24 +109,25 @@ pub fn run(args: PartitionArgs) { let m = args.minimizer_size; let theta = args.theta; let level_max = args.level_max; - let partition_bits = args.partition_bits; let n_workers = args.threads.max(1); + let kp = KmerPartition::create(&args.output, args.partition_bits, k, m, args.force) + .unwrap_or_else(|e| { eprintln!("error: {e}"); std::process::exit(1) }); + let kp = Arc::new(Mutex::new(kp)); + let kp_sink = Arc::clone(&kp); + let paths = args.inputs.iter().map(PathBuf::from).collect(); let path_source = obiread::PathIter::new(paths); - let out = Arc::new(Mutex::new(BufWriter::new(io::stdout()))); - let out_sink = Arc::clone(&out); - let pipeline = make_pipeline! { PipelineData, source path_source => Path, ||? open_chunks : Path => RawChunk, |? { move |rope| normalize(rope, k) } : RawChunk => NormChunk, | { move |rope| build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch, - sink? { move |batch| write_batch(batch, &out_sink, partition_bits, k, m) } @ Batch, + sink? { move |batch| write_batch(batch, &kp_sink) } @ Batch, }; WorkerPool::new(pipeline, n_workers, 1).run(); - out.lock().unwrap().flush().expect("flush error"); + kp.lock().unwrap().close().expect("close error"); } diff --git a/src/obikmer/src/cmd/superkmer.rs b/src/obikmer/src/cmd/superkmer.rs index fbe2c83..39acfeb 100644 --- a/src/obikmer/src/cmd/superkmer.rs +++ b/src/obikmer/src/cmd/superkmer.rs @@ -107,7 +107,7 @@ fn write_batch( .map_err(|e| std::io::Error::other(e))? .canonical(m); let partition = (minimizer.hash(m) & partition_mask) as usize; - write_scatter(&sk, &mut *w, k, m, partition_bits, minimizer)?; + write_scatter(&sk, &mut *w, k, m, partition, minimizer)?; } Ok(()) } diff --git a/src/obikmer/src/main.rs b/src/obikmer/src/main.rs index e12bf03..1e36e18 100644 --- a/src/obikmer/src/main.rs +++ b/src/obikmer/src/main.rs @@ -13,11 +13,14 @@ struct Cli { enum Commands { /// Extract super-kmers from a sequence file (scatter phase) Superkmer(cmd::superkmer::SuperkmerArgs), + /// Partition super-kmers on disk by minimizer + Partition(cmd::partition::PartitionArgs), } fn main() { let cli = Cli::parse(); match cli.command { Commands::Superkmer(args) => cmd::superkmer::run(args), + Commands::Partition(args) => cmd::partition::run(args), } } diff --git a/src/obikpartitionner/Cargo.toml b/src/obikpartitionner/Cargo.toml index 933c598..fc2c846 100644 --- a/src/obikpartitionner/Cargo.toml +++ b/src/obikpartitionner/Cargo.toml @@ -4,4 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] +niffler = "3.0.0" +obikseq = { path = "../obikseq" } obiskio = { path = "../obiskio" } +serde = { version = "1", features = ["derive"] } +serde_json = "1" diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index 468b273..42e078b 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -1,3 +1,3 @@ -mod manager; +mod partition; -pub use manager::PartitionManager; +pub use partition::KmerPartition; diff --git a/src/obikpartitionner/src/manager.rs b/src/obikpartitionner/src/manager.rs deleted file mode 100644 index 07f0e59..0000000 --- a/src/obikpartitionner/src/manager.rs +++ /dev/null @@ -1,18 +0,0 @@ -use obiskio::SKFileWriter; -use std::path::Path; - -pub struct PartitionManager { - root_path: Box, - partitions_mask: u64, - writers: Vec, -} - -impl PartitionManager { - pub fn new(root_path: Box, n_partition_bits: usize) -> Self { - Self { - root_path, - partitions_mask: (1u64 << n_partition_bits) - 1, - writers: Vec::new(), - } - } -} diff --git a/src/obikpartitionner/src/partition.rs b/src/obikpartitionner/src/partition.rs new file mode 100644 index 0000000..0b71110 --- /dev/null +++ b/src/obikpartitionner/src/partition.rs @@ -0,0 +1,196 @@ +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use niffler::Level; +use niffler::send::compression::Format; +use obikseq::superkmer::SuperKmer; +use serde::{Deserialize, Serialize}; +use obiskio::{SKFilePool, SKFileWriter, SKResult, SharedPool, create_token_with}; + +const META_FILENAME: &str = "partition.meta"; + +#[derive(Serialize, Deserialize)] +struct PartitionMeta { + n_bits: usize, + kmer_size: usize, + minimizer_size: usize, + format: String, + level: u32, +} + +pub struct KmerPartition { + root_path: PathBuf, + n_partitions: usize, + partitions_mask: u64, + kmer_size: usize, + minimizer_size: usize, + pool: SharedPool, + writers: Vec>, + format: Format, + level: Level, + closed: bool, +} + +impl KmerPartition { + pub fn create>( + path: P, + n_bits: usize, + kmer_size: usize, + minimizer_size: usize, + force: bool, + ) -> SKResult { + Self::create_with(path, n_bits, kmer_size, minimizer_size, Format::Zstd, Level::Three, force) + } + + pub fn create_with>( + path: P, + n_bits: usize, + kmer_size: usize, + minimizer_size: usize, + format: Format, + level: Level, + force: bool, + ) -> SKResult { + let root_path = path.as_ref().to_owned(); + if root_path.exists() { + if force { + fs::remove_dir_all(&root_path)?; + } else { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("{}: partition directory already exists", root_path.display()), + ) + .into()); + } + } + fs::create_dir_all(&root_path)?; + let n_partitions = 1usize << n_bits; + let pool = Arc::new(Mutex::new(SKFilePool::from_system_limits())); + let writers = (0..n_partitions).map(|_| None).collect(); + let partition = Self { + root_path, + n_partitions, + partitions_mask: (1u64 << n_bits) - 1, + kmer_size, + minimizer_size, + pool, + writers, + format, + level, + closed: false, + }; + partition.write_meta(n_bits)?; + Ok(partition) + } + + pub fn write(&mut self, sk: &SuperKmer) -> SKResult<()> { + self.check_not_closed()?; + let partition = self.partition_of(sk)?; + self.ensure_writer(partition)?.write(sk) + } + + pub fn write_batch(&mut self, sks: &[SuperKmer]) -> SKResult<()> { + self.check_not_closed()?; + for sk in sks { + let partition = self.partition_of(sk)?; + self.ensure_writer(partition)?.write(sk)?; + } + Ok(()) + } + + pub fn flush(&mut self) -> SKResult<()> { + self.check_not_closed()?; + for writer in self.writers.iter_mut().flatten() { + writer.flush()?; + } + Ok(()) + } + + pub fn close(&mut self) -> SKResult<()> { + if self.closed { + return Ok(()); + } + self.closed = true; + for writer in self.writers.iter_mut().flatten() { + writer.close()?; + } + Ok(()) + } + + pub fn is_open(&self) -> bool { + !self.closed + } + + pub fn path(&self) -> &Path { + &self.root_path + } + + // ── private ─────────────────────────────────────────────────────────────── + + fn check_not_closed(&self) -> SKResult<()> { + if self.closed { + Err(io::Error::new( + io::ErrorKind::BrokenPipe, + "write to closed KmerPartition", + ) + .into()) + } else { + Ok(()) + } + } + + fn partition_of(&self, sk: &SuperKmer) -> SKResult { + let minimizer = sk + .kmer(sk.minimizer_pos() as usize, self.minimizer_size) + .map_err(|e| io::Error::other(e))? + .canonical(self.minimizer_size); + Ok((minimizer.hash(self.minimizer_size) & self.partitions_mask) as usize) + } + + fn write_meta(&self, n_bits: usize) -> SKResult<()> { + let meta = PartitionMeta { + n_bits, + kmer_size: self.kmer_size, + minimizer_size: self.minimizer_size, + format: match self.format { + Format::Gzip => "gzip", + Format::Bzip => "bzip2", + Format::Lzma => "lzma", + Format::Zstd => "zstd", + Format::No => "none", + } + .to_owned(), + level: u32::from(self.level), + }; + let f = fs::File::create(self.root_path.join(META_FILENAME))?; + serde_json::to_writer_pretty(f, &meta) + .map_err(|e| io::Error::other(e))?; + Ok(()) + } + + fn ensure_writer(&mut self, partition: usize) -> SKResult<&mut SKFileWriter> { + if self.writers[partition].is_none() { + let dir = self.root_path.join(format!("part_{:05}", partition)); + fs::create_dir_all(&dir)?; + let ext = match self.format { + Format::Gzip => "skmer.gz", + Format::Bzip => "skmer.bz2", + Format::Lzma => "skmer.xz", + Format::Zstd => "skmer.zst", + Format::No => "skmer", + }; + let file_path = dir.join(format!("raw.{ext}")); + let writer = create_token_with(&self.pool, file_path, self.format, self.level)?; + self.writers[partition] = Some(writer); + } + Ok(self.writers[partition].as_mut().unwrap()) + } +} + +impl Drop for KmerPartition { + fn drop(&mut self) { + let _ = self.close(); + } +} diff --git a/src/obipipeline/src/scheduler.rs b/src/obipipeline/src/scheduler.rs index fc6625b..5cf0540 100644 --- a/src/obipipeline/src/scheduler.rs +++ b/src/obipipeline/src/scheduler.rs @@ -78,8 +78,8 @@ impl Clone for Stage { // ── Worker task ─────────────────────────────────────────────────────────────── enum WorkerTask { - Transform(D, SharedFn, Sender>), - Flat(D, SharedFlatFn, Sender>, Sender), + Transform(D, usize), + Flat(D, usize), } // ── Thread runners ──────────────────────────────────────────────────────────── @@ -121,18 +121,27 @@ where /// - `Transform` : applique `f(data)` et envoie le résultat dans `result_tx`. /// - `Flat` : appelle `f(data, &push_tx, &delta_tx)` ; la fonction elle-même /// pousse ses items dans `push_tx` et envoie `N-1` dans `delta_tx`. -fn transform_runner(task_rx: Receiver>) -> thread::JoinHandle<()> +fn transform_runner( + task_rx: Receiver>, + stages: Vec>, + stage_txs: Vec>>, + flat_delta_tx: Sender, +) -> thread::JoinHandle<()> where DATA: Send + Sync + 'static, { thread::spawn(move || { while let Ok(task) = task_rx.recv() { match task { - WorkerTask::Transform(data, f, result_tx) => { - let _ = result_tx.send(f(data)); + WorkerTask::Transform(data, idx) => { + if let Stage::Transform(f) = &stages[idx] { + let _ = stage_txs[idx].send(f(data)); + } } - WorkerTask::Flat(data, f, push_tx, delta_tx) => { - f(data, &push_tx, &delta_tx); + WorkerTask::Flat(data, idx) => { + if let Stage::Flat(f) = &stages[idx] { + f(data, &stage_txs[idx], &flat_delta_tx); + } } } } @@ -223,23 +232,31 @@ where let stages = self.pipeline.stages; + // ── Canal delta pour les flat stages ─────────────────────────────── + // Chaque flat worker envoie `N-1` ici après avoir poussé N items. + // Le scheduler ajuste `in_flight` en conséquence. + let (flat_delta_tx, flat_delta_rx) = bounded::(self.capacity); + // ── Worker pool ──────────────────────────────────────────────────── let (worker_tx, worker_rx): (Sender>, Receiver>) = bounded(self.capacity); for _ in 0..self.n_workers { - self.handles.push(transform_runner(worker_rx.clone())); + self.handles.push(transform_runner( + worker_rx.clone(), + stages.iter().map(Stage::clone).collect(), + stage_txs.clone(), + flat_delta_tx.clone(), + )); } + // Le scheduler ne tient plus flat_delta_tx : les workers le détiennent. + // On le drop ici pour que le canal se ferme quand les workers terminent. + drop(flat_delta_tx); // ── Sink thread ──────────────────────────────────────────────────── let (sink_tx, sink_err_rx, sink_handle) = sink_runner(self.pipeline.sink, self.capacity); self.handles.push(sink_handle); - // ── Canal delta pour les flat stages ─────────────────────────────── - // Chaque flat worker envoie `N-1` ici après avoir poussé N items. - // Le scheduler ajuste `in_flight` en conséquence. - let (flat_delta_tx, flat_delta_rx) = bounded::(self.capacity); - // ── Boucle principale ────────────────────────────────────────────── // // `in_flight` (isize) = nb d'items qui doivent encore atteindre le sink. @@ -313,8 +330,8 @@ where in_flight += 1; dispatch( data, 0, - &stages, &stage_txs, &worker_tx, - &flat_delta_tx, &mut flat_workers_active, + &stages, &worker_tx, + &mut flat_workers_active, ); } } @@ -333,8 +350,8 @@ where } else { dispatch( data, stage + 1, - &stages, &stage_txs, &worker_tx, - &flat_delta_tx, &mut flat_workers_active, + &stages, &worker_tx, + &mut flat_workers_active, ); } } @@ -362,27 +379,16 @@ fn dispatch( data: DATA, stage_idx: usize, stages: &[Stage], - stage_txs: &[Sender>], worker_tx: &Sender>, - flat_delta_tx: &Sender, flat_workers_active: &mut usize, ) { match &stages[stage_idx] { - Stage::Transform(f) => { - let _ = worker_tx.send(WorkerTask::Transform( - data, - Arc::clone(f), - stage_txs[stage_idx].clone(), - )); + Stage::Transform(_) => { + let _ = worker_tx.send(WorkerTask::Transform(data, stage_idx)); } - Stage::Flat(f) => { + Stage::Flat(_) => { *flat_workers_active += 1; - let _ = worker_tx.send(WorkerTask::Flat( - data, - Arc::clone(f), - stage_txs[stage_idx].clone(), - flat_delta_tx.clone(), - )); + let _ = worker_tx.send(WorkerTask::Flat(data, stage_idx)); } } }