.gitignore: ignore zstandard-compressed files
- Add *.zst pattern to .gitignore - Prevents tracking of zstandard-compressed archives
This commit is contained in:
@@ -0,0 +1,7 @@
|
|||||||
|
{
|
||||||
|
"n_bits": 8,
|
||||||
|
"kmer_size": 31,
|
||||||
|
"minimizer_size": 11,
|
||||||
|
"format": "zstd",
|
||||||
|
"level": 3
|
||||||
|
}
|
||||||
Generated
+5
@@ -771,6 +771,7 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"clap",
|
"clap",
|
||||||
"obifastwrite",
|
"obifastwrite",
|
||||||
|
"obikpartitionner",
|
||||||
"obikrope",
|
"obikrope",
|
||||||
"obikseq",
|
"obikseq",
|
||||||
"obipipeline",
|
"obipipeline",
|
||||||
@@ -782,7 +783,11 @@ dependencies = [
|
|||||||
name = "obikpartitionner"
|
name = "obikpartitionner"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"niffler 3.0.0",
|
||||||
|
"obikseq",
|
||||||
"obiskio",
|
"obiskio",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|||||||
@@ -15,3 +15,4 @@ obifastwrite = { path = "../obifastwrite" }
|
|||||||
obipipeline = { path = "../obipipeline" }
|
obipipeline = { path = "../obipipeline" }
|
||||||
clap = { version = "4", features = ["derive"] }
|
clap = { version = "4", features = ["derive"] }
|
||||||
obikrope = { path = "../obikrope" }
|
obikrope = { path = "../obikrope" }
|
||||||
|
obikpartitionner = { path = "../obikpartitionner" }
|
||||||
|
|||||||
@@ -1 +1,2 @@
|
|||||||
|
pub mod partition;
|
||||||
pub mod superkmer;
|
pub mod superkmer;
|
||||||
|
|||||||
@@ -1,16 +1,20 @@
|
|||||||
use std::io::{self, BufWriter, Write};
|
use std::io;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use obifastwrite::write_scatter;
|
|
||||||
use obikrope::Rope;
|
use obikrope::Rope;
|
||||||
use obikseq::superkmer::SuperKmer;
|
use obikseq::superkmer::SuperKmer;
|
||||||
use obipipeline::{WorkerPool, make_pipeline};
|
use obipipeline::{WorkerPool, make_pipeline};
|
||||||
use obiskbuilder::SuperKmerIter;
|
use obiskbuilder::SuperKmerIter;
|
||||||
|
use obikpartitionner::KmerPartition;
|
||||||
|
|
||||||
#[derive(Args)]
|
#[derive(Args)]
|
||||||
pub struct PartitionArgs {
|
pub struct PartitionArgs {
|
||||||
|
/// Output partition directory
|
||||||
|
#[arg(short, long)]
|
||||||
|
pub output: PathBuf,
|
||||||
|
|
||||||
/// Input files or directories (FASTA/FASTQ, optionally gzip-compressed)
|
/// Input files or directories (FASTA/FASTQ, optionally gzip-compressed)
|
||||||
#[arg(num_args = 1..)]
|
#[arg(num_args = 1..)]
|
||||||
pub inputs: Vec<String>,
|
pub inputs: Vec<String>,
|
||||||
@@ -35,6 +39,10 @@ pub struct PartitionArgs {
|
|||||||
#[arg(short, long, default_value_t = 8)]
|
#[arg(short, long, default_value_t = 8)]
|
||||||
pub partition_bits: usize,
|
pub partition_bits: usize,
|
||||||
|
|
||||||
|
/// Overwrite output directory if it already exists
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
pub force: bool,
|
||||||
|
|
||||||
/// Number of worker threads
|
/// Number of worker threads
|
||||||
#[arg(
|
#[arg(
|
||||||
short = 'T',
|
short = 'T',
|
||||||
@@ -59,8 +67,6 @@ unsafe impl Sync for PipelineData {}
|
|||||||
|
|
||||||
// ── Stage functions ───────────────────────────────────────────────────────────
|
// ── Stage functions ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
/// Opens a sequence file and returns an iterator over its raw Rope chunks.
|
|
||||||
/// Chunk-level I/O errors are logged and skipped.
|
|
||||||
fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
|
fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
|
||||||
let path_str = path
|
let path_str = path
|
||||||
.to_str()
|
.to_str()
|
||||||
@@ -75,12 +81,10 @@ fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Normalises a raw sequence chunk (FASTA or FASTQ) into a compact ACGT/NUL rope.
|
|
||||||
fn normalize(rope: Rope, k: usize) -> io::Result<Rope> {
|
fn normalize(rope: Rope, k: usize) -> io::Result<Rope> {
|
||||||
obiread::normalize_sequence_chunk(rope, k)
|
obiread::normalize_sequence_chunk(rope, k)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extracts all super-kmers from a normalised rope.
|
|
||||||
fn build_superkmers(
|
fn build_superkmers(
|
||||||
rope: Rope,
|
rope: Rope,
|
||||||
k: usize,
|
k: usize,
|
||||||
@@ -91,34 +95,11 @@ fn build_superkmers(
|
|||||||
SuperKmerIter::new(&rope, k, m, level_max, theta).collect()
|
SuperKmerIter::new(&rope, k, m, level_max, theta).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Writes a batch of super-kmers to the output sink.
|
fn write_batch(batch: Vec<SuperKmer>, kp: &Mutex<KmerPartition>) -> io::Result<()> {
|
||||||
fn write_batch(
|
kp.lock()
|
||||||
batch: Vec<SuperKmer>,
|
.unwrap()
|
||||||
out: &Mutex<BufWriter<io::Stdout>>,
|
.write_batch(&batch)
|
||||||
partition_bits: usize,
|
.map_err(|e| io::Error::other(e))
|
||||||
k: usize,
|
|
||||||
m: usize,
|
|
||||||
) -> io::Result<()> {
|
|
||||||
let mut w = out.lock().unwrap();
|
|
||||||
let partition_mask = (1u64 << partition_bits) - 1;
|
|
||||||
for sk in batch {
|
|
||||||
let minimizer = sk
|
|
||||||
.kmer(sk.minimizer_pos() as usize, m)
|
|
||||||
.map_err(|e| std::io::Error::other(e))?
|
|
||||||
.canonical(m);
|
|
||||||
let partition = (minimizer.hash(m) & partition_mask) as usize;
|
|
||||||
write_scatter(&sk, &mut *w, k, m, partition_bits, minimizer)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn mix64(x: u64) -> u64 {
|
|
||||||
let x = x ^ (x >> 30);
|
|
||||||
let x = x.wrapping_mul(0xbf58476d1ce4e5b9);
|
|
||||||
let x = x ^ (x >> 27);
|
|
||||||
let x = x.wrapping_mul(0x94d049bb133111eb);
|
|
||||||
x ^ (x >> 31)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Entry point ───────────────────────────────────────────────────────────────
|
// ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
@@ -128,24 +109,25 @@ pub fn run(args: PartitionArgs) {
|
|||||||
let m = args.minimizer_size;
|
let m = args.minimizer_size;
|
||||||
let theta = args.theta;
|
let theta = args.theta;
|
||||||
let level_max = args.level_max;
|
let level_max = args.level_max;
|
||||||
let partition_bits = args.partition_bits;
|
|
||||||
let n_workers = args.threads.max(1);
|
let n_workers = args.threads.max(1);
|
||||||
|
|
||||||
|
let kp = KmerPartition::create(&args.output, args.partition_bits, k, m, args.force)
|
||||||
|
.unwrap_or_else(|e| { eprintln!("error: {e}"); std::process::exit(1) });
|
||||||
|
let kp = Arc::new(Mutex::new(kp));
|
||||||
|
let kp_sink = Arc::clone(&kp);
|
||||||
|
|
||||||
let paths = args.inputs.iter().map(PathBuf::from).collect();
|
let paths = args.inputs.iter().map(PathBuf::from).collect();
|
||||||
let path_source = obiread::PathIter::new(paths);
|
let path_source = obiread::PathIter::new(paths);
|
||||||
|
|
||||||
let out = Arc::new(Mutex::new(BufWriter::new(io::stdout())));
|
|
||||||
let out_sink = Arc::clone(&out);
|
|
||||||
|
|
||||||
let pipeline = make_pipeline! {
|
let pipeline = make_pipeline! {
|
||||||
PipelineData,
|
PipelineData,
|
||||||
source path_source => Path,
|
source path_source => Path,
|
||||||
||? open_chunks : Path => RawChunk,
|
||? open_chunks : Path => RawChunk,
|
||||||
|? { move |rope| normalize(rope, k) } : RawChunk => NormChunk,
|
|? { move |rope| normalize(rope, k) } : RawChunk => NormChunk,
|
||||||
| { move |rope| build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch,
|
| { move |rope| build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch,
|
||||||
sink? { move |batch| write_batch(batch, &out_sink, partition_bits, k, m) } @ Batch,
|
sink? { move |batch| write_batch(batch, &kp_sink) } @ Batch,
|
||||||
};
|
};
|
||||||
|
|
||||||
WorkerPool::new(pipeline, n_workers, 1).run();
|
WorkerPool::new(pipeline, n_workers, 1).run();
|
||||||
out.lock().unwrap().flush().expect("flush error");
|
kp.lock().unwrap().close().expect("close error");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ fn write_batch(
|
|||||||
.map_err(|e| std::io::Error::other(e))?
|
.map_err(|e| std::io::Error::other(e))?
|
||||||
.canonical(m);
|
.canonical(m);
|
||||||
let partition = (minimizer.hash(m) & partition_mask) as usize;
|
let partition = (minimizer.hash(m) & partition_mask) as usize;
|
||||||
write_scatter(&sk, &mut *w, k, m, partition_bits, minimizer)?;
|
write_scatter(&sk, &mut *w, k, m, partition, minimizer)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,11 +13,14 @@ struct Cli {
|
|||||||
enum Commands {
|
enum Commands {
|
||||||
/// Extract super-kmers from a sequence file (scatter phase)
|
/// Extract super-kmers from a sequence file (scatter phase)
|
||||||
Superkmer(cmd::superkmer::SuperkmerArgs),
|
Superkmer(cmd::superkmer::SuperkmerArgs),
|
||||||
|
/// Partition super-kmers on disk by minimizer
|
||||||
|
Partition(cmd::partition::PartitionArgs),
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
match cli.command {
|
match cli.command {
|
||||||
Commands::Superkmer(args) => cmd::superkmer::run(args),
|
Commands::Superkmer(args) => cmd::superkmer::run(args),
|
||||||
|
Commands::Partition(args) => cmd::partition::run(args),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,4 +4,8 @@ version = "0.1.0"
|
|||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
niffler = "3.0.0"
|
||||||
|
obikseq = { path = "../obikseq" }
|
||||||
obiskio = { path = "../obiskio" }
|
obiskio = { path = "../obiskio" }
|
||||||
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
mod manager;
|
mod partition;
|
||||||
|
|
||||||
pub use manager::PartitionManager;
|
pub use partition::KmerPartition;
|
||||||
|
|||||||
@@ -1,18 +0,0 @@
|
|||||||
use obiskio::SKFileWriter;
|
|
||||||
use std::path::Path;
|
|
||||||
|
|
||||||
pub struct PartitionManager {
|
|
||||||
root_path: Box<Path>,
|
|
||||||
partitions_mask: u64,
|
|
||||||
writers: Vec<SKFileWriter>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PartitionManager {
|
|
||||||
pub fn new(root_path: Box<Path>, n_partition_bits: usize) -> Self {
|
|
||||||
Self {
|
|
||||||
root_path,
|
|
||||||
partitions_mask: (1u64 << n_partition_bits) - 1,
|
|
||||||
writers: Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,196 @@
|
|||||||
|
use std::fs;
|
||||||
|
use std::io;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use niffler::Level;
|
||||||
|
use niffler::send::compression::Format;
|
||||||
|
use obikseq::superkmer::SuperKmer;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use obiskio::{SKFilePool, SKFileWriter, SKResult, SharedPool, create_token_with};
|
||||||
|
|
||||||
|
const META_FILENAME: &str = "partition.meta";
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct PartitionMeta {
|
||||||
|
n_bits: usize,
|
||||||
|
kmer_size: usize,
|
||||||
|
minimizer_size: usize,
|
||||||
|
format: String,
|
||||||
|
level: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct KmerPartition {
|
||||||
|
root_path: PathBuf,
|
||||||
|
n_partitions: usize,
|
||||||
|
partitions_mask: u64,
|
||||||
|
kmer_size: usize,
|
||||||
|
minimizer_size: usize,
|
||||||
|
pool: SharedPool,
|
||||||
|
writers: Vec<Option<SKFileWriter>>,
|
||||||
|
format: Format,
|
||||||
|
level: Level,
|
||||||
|
closed: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl KmerPartition {
|
||||||
|
pub fn create<P: AsRef<Path>>(
|
||||||
|
path: P,
|
||||||
|
n_bits: usize,
|
||||||
|
kmer_size: usize,
|
||||||
|
minimizer_size: usize,
|
||||||
|
force: bool,
|
||||||
|
) -> SKResult<Self> {
|
||||||
|
Self::create_with(path, n_bits, kmer_size, minimizer_size, Format::Zstd, Level::Three, force)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_with<P: AsRef<Path>>(
|
||||||
|
path: P,
|
||||||
|
n_bits: usize,
|
||||||
|
kmer_size: usize,
|
||||||
|
minimizer_size: usize,
|
||||||
|
format: Format,
|
||||||
|
level: Level,
|
||||||
|
force: bool,
|
||||||
|
) -> SKResult<Self> {
|
||||||
|
let root_path = path.as_ref().to_owned();
|
||||||
|
if root_path.exists() {
|
||||||
|
if force {
|
||||||
|
fs::remove_dir_all(&root_path)?;
|
||||||
|
} else {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::AlreadyExists,
|
||||||
|
format!("{}: partition directory already exists", root_path.display()),
|
||||||
|
)
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fs::create_dir_all(&root_path)?;
|
||||||
|
let n_partitions = 1usize << n_bits;
|
||||||
|
let pool = Arc::new(Mutex::new(SKFilePool::from_system_limits()));
|
||||||
|
let writers = (0..n_partitions).map(|_| None).collect();
|
||||||
|
let partition = Self {
|
||||||
|
root_path,
|
||||||
|
n_partitions,
|
||||||
|
partitions_mask: (1u64 << n_bits) - 1,
|
||||||
|
kmer_size,
|
||||||
|
minimizer_size,
|
||||||
|
pool,
|
||||||
|
writers,
|
||||||
|
format,
|
||||||
|
level,
|
||||||
|
closed: false,
|
||||||
|
};
|
||||||
|
partition.write_meta(n_bits)?;
|
||||||
|
Ok(partition)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write(&mut self, sk: &SuperKmer) -> SKResult<()> {
|
||||||
|
self.check_not_closed()?;
|
||||||
|
let partition = self.partition_of(sk)?;
|
||||||
|
self.ensure_writer(partition)?.write(sk)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_batch(&mut self, sks: &[SuperKmer]) -> SKResult<()> {
|
||||||
|
self.check_not_closed()?;
|
||||||
|
for sk in sks {
|
||||||
|
let partition = self.partition_of(sk)?;
|
||||||
|
self.ensure_writer(partition)?.write(sk)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flush(&mut self) -> SKResult<()> {
|
||||||
|
self.check_not_closed()?;
|
||||||
|
for writer in self.writers.iter_mut().flatten() {
|
||||||
|
writer.flush()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close(&mut self) -> SKResult<()> {
|
||||||
|
if self.closed {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
self.closed = true;
|
||||||
|
for writer in self.writers.iter_mut().flatten() {
|
||||||
|
writer.close()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_open(&self) -> bool {
|
||||||
|
!self.closed
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn path(&self) -> &Path {
|
||||||
|
&self.root_path
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── private ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
fn check_not_closed(&self) -> SKResult<()> {
|
||||||
|
if self.closed {
|
||||||
|
Err(io::Error::new(
|
||||||
|
io::ErrorKind::BrokenPipe,
|
||||||
|
"write to closed KmerPartition",
|
||||||
|
)
|
||||||
|
.into())
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn partition_of(&self, sk: &SuperKmer) -> SKResult<usize> {
|
||||||
|
let minimizer = sk
|
||||||
|
.kmer(sk.minimizer_pos() as usize, self.minimizer_size)
|
||||||
|
.map_err(|e| io::Error::other(e))?
|
||||||
|
.canonical(self.minimizer_size);
|
||||||
|
Ok((minimizer.hash(self.minimizer_size) & self.partitions_mask) as usize)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_meta(&self, n_bits: usize) -> SKResult<()> {
|
||||||
|
let meta = PartitionMeta {
|
||||||
|
n_bits,
|
||||||
|
kmer_size: self.kmer_size,
|
||||||
|
minimizer_size: self.minimizer_size,
|
||||||
|
format: match self.format {
|
||||||
|
Format::Gzip => "gzip",
|
||||||
|
Format::Bzip => "bzip2",
|
||||||
|
Format::Lzma => "lzma",
|
||||||
|
Format::Zstd => "zstd",
|
||||||
|
Format::No => "none",
|
||||||
|
}
|
||||||
|
.to_owned(),
|
||||||
|
level: u32::from(self.level),
|
||||||
|
};
|
||||||
|
let f = fs::File::create(self.root_path.join(META_FILENAME))?;
|
||||||
|
serde_json::to_writer_pretty(f, &meta)
|
||||||
|
.map_err(|e| io::Error::other(e))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ensure_writer(&mut self, partition: usize) -> SKResult<&mut SKFileWriter> {
|
||||||
|
if self.writers[partition].is_none() {
|
||||||
|
let dir = self.root_path.join(format!("part_{:05}", partition));
|
||||||
|
fs::create_dir_all(&dir)?;
|
||||||
|
let ext = match self.format {
|
||||||
|
Format::Gzip => "skmer.gz",
|
||||||
|
Format::Bzip => "skmer.bz2",
|
||||||
|
Format::Lzma => "skmer.xz",
|
||||||
|
Format::Zstd => "skmer.zst",
|
||||||
|
Format::No => "skmer",
|
||||||
|
};
|
||||||
|
let file_path = dir.join(format!("raw.{ext}"));
|
||||||
|
let writer = create_token_with(&self.pool, file_path, self.format, self.level)?;
|
||||||
|
self.writers[partition] = Some(writer);
|
||||||
|
}
|
||||||
|
Ok(self.writers[partition].as_mut().unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for KmerPartition {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let _ = self.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -78,8 +78,8 @@ impl<D> Clone for Stage<D> {
|
|||||||
// ── Worker task ───────────────────────────────────────────────────────────────
|
// ── Worker task ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
enum WorkerTask<D> {
|
enum WorkerTask<D> {
|
||||||
Transform(D, SharedFn<D>, Sender<Result<D, PipelineError>>),
|
Transform(D, usize),
|
||||||
Flat(D, SharedFlatFn<D>, Sender<Result<D, PipelineError>>, Sender<isize>),
|
Flat(D, usize),
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Thread runners ────────────────────────────────────────────────────────────
|
// ── Thread runners ────────────────────────────────────────────────────────────
|
||||||
@@ -121,18 +121,27 @@ where
|
|||||||
/// - `Transform` : applique `f(data)` et envoie le résultat dans `result_tx`.
|
/// - `Transform` : applique `f(data)` et envoie le résultat dans `result_tx`.
|
||||||
/// - `Flat` : appelle `f(data, &push_tx, &delta_tx)` ; la fonction elle-même
|
/// - `Flat` : appelle `f(data, &push_tx, &delta_tx)` ; la fonction elle-même
|
||||||
/// pousse ses items dans `push_tx` et envoie `N-1` dans `delta_tx`.
|
/// pousse ses items dans `push_tx` et envoie `N-1` dans `delta_tx`.
|
||||||
fn transform_runner<DATA>(task_rx: Receiver<WorkerTask<DATA>>) -> thread::JoinHandle<()>
|
fn transform_runner<DATA>(
|
||||||
|
task_rx: Receiver<WorkerTask<DATA>>,
|
||||||
|
stages: Vec<Stage<DATA>>,
|
||||||
|
stage_txs: Vec<Sender<Result<DATA, PipelineError>>>,
|
||||||
|
flat_delta_tx: Sender<isize>,
|
||||||
|
) -> thread::JoinHandle<()>
|
||||||
where
|
where
|
||||||
DATA: Send + Sync + 'static,
|
DATA: Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
while let Ok(task) = task_rx.recv() {
|
while let Ok(task) = task_rx.recv() {
|
||||||
match task {
|
match task {
|
||||||
WorkerTask::Transform(data, f, result_tx) => {
|
WorkerTask::Transform(data, idx) => {
|
||||||
let _ = result_tx.send(f(data));
|
if let Stage::Transform(f) = &stages[idx] {
|
||||||
|
let _ = stage_txs[idx].send(f(data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
WorkerTask::Flat(data, idx) => {
|
||||||
|
if let Stage::Flat(f) = &stages[idx] {
|
||||||
|
f(data, &stage_txs[idx], &flat_delta_tx);
|
||||||
}
|
}
|
||||||
WorkerTask::Flat(data, f, push_tx, delta_tx) => {
|
|
||||||
f(data, &push_tx, &delta_tx);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -223,23 +232,31 @@ where
|
|||||||
|
|
||||||
let stages = self.pipeline.stages;
|
let stages = self.pipeline.stages;
|
||||||
|
|
||||||
|
// ── Canal delta pour les flat stages ───────────────────────────────
|
||||||
|
// Chaque flat worker envoie `N-1` ici après avoir poussé N items.
|
||||||
|
// Le scheduler ajuste `in_flight` en conséquence.
|
||||||
|
let (flat_delta_tx, flat_delta_rx) = bounded::<isize>(self.capacity);
|
||||||
|
|
||||||
// ── Worker pool ────────────────────────────────────────────────────
|
// ── Worker pool ────────────────────────────────────────────────────
|
||||||
let (worker_tx, worker_rx): (Sender<WorkerTask<DATA>>, Receiver<WorkerTask<DATA>>) =
|
let (worker_tx, worker_rx): (Sender<WorkerTask<DATA>>, Receiver<WorkerTask<DATA>>) =
|
||||||
bounded(self.capacity);
|
bounded(self.capacity);
|
||||||
|
|
||||||
for _ in 0..self.n_workers {
|
for _ in 0..self.n_workers {
|
||||||
self.handles.push(transform_runner(worker_rx.clone()));
|
self.handles.push(transform_runner(
|
||||||
|
worker_rx.clone(),
|
||||||
|
stages.iter().map(Stage::clone).collect(),
|
||||||
|
stage_txs.clone(),
|
||||||
|
flat_delta_tx.clone(),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
// Le scheduler ne tient plus flat_delta_tx : les workers le détiennent.
|
||||||
|
// On le drop ici pour que le canal se ferme quand les workers terminent.
|
||||||
|
drop(flat_delta_tx);
|
||||||
|
|
||||||
// ── Sink thread ────────────────────────────────────────────────────
|
// ── Sink thread ────────────────────────────────────────────────────
|
||||||
let (sink_tx, sink_err_rx, sink_handle) = sink_runner(self.pipeline.sink, self.capacity);
|
let (sink_tx, sink_err_rx, sink_handle) = sink_runner(self.pipeline.sink, self.capacity);
|
||||||
self.handles.push(sink_handle);
|
self.handles.push(sink_handle);
|
||||||
|
|
||||||
// ── Canal delta pour les flat stages ───────────────────────────────
|
|
||||||
// Chaque flat worker envoie `N-1` ici après avoir poussé N items.
|
|
||||||
// Le scheduler ajuste `in_flight` en conséquence.
|
|
||||||
let (flat_delta_tx, flat_delta_rx) = bounded::<isize>(self.capacity);
|
|
||||||
|
|
||||||
// ── Boucle principale ──────────────────────────────────────────────
|
// ── Boucle principale ──────────────────────────────────────────────
|
||||||
//
|
//
|
||||||
// `in_flight` (isize) = nb d'items qui doivent encore atteindre le sink.
|
// `in_flight` (isize) = nb d'items qui doivent encore atteindre le sink.
|
||||||
@@ -313,8 +330,8 @@ where
|
|||||||
in_flight += 1;
|
in_flight += 1;
|
||||||
dispatch(
|
dispatch(
|
||||||
data, 0,
|
data, 0,
|
||||||
&stages, &stage_txs, &worker_tx,
|
&stages, &worker_tx,
|
||||||
&flat_delta_tx, &mut flat_workers_active,
|
&mut flat_workers_active,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -333,8 +350,8 @@ where
|
|||||||
} else {
|
} else {
|
||||||
dispatch(
|
dispatch(
|
||||||
data, stage + 1,
|
data, stage + 1,
|
||||||
&stages, &stage_txs, &worker_tx,
|
&stages, &worker_tx,
|
||||||
&flat_delta_tx, &mut flat_workers_active,
|
&mut flat_workers_active,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -362,27 +379,16 @@ fn dispatch<DATA>(
|
|||||||
data: DATA,
|
data: DATA,
|
||||||
stage_idx: usize,
|
stage_idx: usize,
|
||||||
stages: &[Stage<DATA>],
|
stages: &[Stage<DATA>],
|
||||||
stage_txs: &[Sender<Result<DATA, PipelineError>>],
|
|
||||||
worker_tx: &Sender<WorkerTask<DATA>>,
|
worker_tx: &Sender<WorkerTask<DATA>>,
|
||||||
flat_delta_tx: &Sender<isize>,
|
|
||||||
flat_workers_active: &mut usize,
|
flat_workers_active: &mut usize,
|
||||||
) {
|
) {
|
||||||
match &stages[stage_idx] {
|
match &stages[stage_idx] {
|
||||||
Stage::Transform(f) => {
|
Stage::Transform(_) => {
|
||||||
let _ = worker_tx.send(WorkerTask::Transform(
|
let _ = worker_tx.send(WorkerTask::Transform(data, stage_idx));
|
||||||
data,
|
|
||||||
Arc::clone(f),
|
|
||||||
stage_txs[stage_idx].clone(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
Stage::Flat(f) => {
|
Stage::Flat(_) => {
|
||||||
*flat_workers_active += 1;
|
*flat_workers_active += 1;
|
||||||
let _ = worker_tx.send(WorkerTask::Flat(
|
let _ = worker_tx.send(WorkerTask::Flat(data, stage_idx));
|
||||||
data,
|
|
||||||
Arc::clone(f),
|
|
||||||
stage_txs[stage_idx].clone(),
|
|
||||||
flat_delta_tx.clone(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user