✨ add PhantomData import for generic type safety
- Added `use std::marker::PhantomData;` to prepare for generic scheduler implementations - Ensures type safety and avoids unused lifetime/type parameters warnings
This commit is contained in:
@@ -7,3 +7,5 @@ data-stress
|
||||
*.pb
|
||||
*.json
|
||||
*.bin
|
||||
*.bin
|
||||
*.json
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use clap::Args;
|
||||
use obikrope::Rope;
|
||||
use obikseq::superkmer::SuperKmer;
|
||||
|
||||
// ── Shared arguments ──────────────────────────────────────────────────────────
|
||||
|
||||
#[derive(Args)]
|
||||
pub struct CommonArgs {
|
||||
/// Input files or directories (FASTA/FASTQ, optionally gzip-compressed)
|
||||
#[arg(num_args = 1..)]
|
||||
pub inputs: Vec<String>,
|
||||
|
||||
/// k-mer size
|
||||
#[arg(short, long, default_value_t = 31)]
|
||||
pub kmer_size: usize,
|
||||
|
||||
/// Minimizer size
|
||||
#[arg(short, long, default_value_t = 11)]
|
||||
pub minimizer_size: usize,
|
||||
|
||||
/// Entropy threshold (k-mers with score ≤ theta are rejected)
|
||||
#[arg(long, default_value_t = 0.7)]
|
||||
pub theta: f64,
|
||||
|
||||
/// Maximum sub-word size for entropy computation
|
||||
#[arg(long, default_value_t = 6)]
|
||||
pub level_max: usize,
|
||||
|
||||
/// Number of bits to encode partitions (allows up to 2^partition_bits partitions)
|
||||
#[arg(short, long, default_value_t = 8)]
|
||||
pub partition_bits: usize,
|
||||
|
||||
/// Number of worker threads
|
||||
#[arg(
|
||||
short = 'T',
|
||||
long,
|
||||
default_value_t = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1)
|
||||
)]
|
||||
pub threads: usize,
|
||||
}
|
||||
|
||||
// ── Pipeline data carrier ─────────────────────────────────────────────────────
|
||||
|
||||
pub enum PipelineData {
|
||||
Path(PathBuf),
|
||||
RawChunk(Rope),
|
||||
NormChunk(Rope),
|
||||
Batch(Vec<SuperKmer>),
|
||||
}
|
||||
|
||||
// SAFETY: Rope contains Cell<u8> which is !Sync, but pipeline ownership transfers
|
||||
// exclusively through channels — no item is ever shared across threads.
|
||||
unsafe impl Send for PipelineData {}
|
||||
unsafe impl Sync for PipelineData {}
|
||||
|
||||
// ── I/O plumbing ──────────────────────────────────────────────────────────────
|
||||
|
||||
pub fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
|
||||
let path_str = path
|
||||
.to_str()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF-8 path"))?;
|
||||
let iter = obiread::read_sequence_chunks(path_str)?;
|
||||
Ok(iter.filter_map(|r| match r {
|
||||
Ok(rope) => Some(rope),
|
||||
Err(e) => {
|
||||
eprintln!("chunk read error: {e}");
|
||||
None
|
||||
}
|
||||
}))
|
||||
}
|
||||
@@ -1,117 +1,47 @@
|
||||
use clap::Args;
|
||||
use obikpartitionner::KmerPartition;
|
||||
use obikrope::Rope;
|
||||
use obikseq::superkmer::SuperKmer;
|
||||
use obipipeline::{WorkerPool, make_pipeline};
|
||||
use obiskbuilder::SuperKmerIter;
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tracing::info;
|
||||
|
||||
use crate::cli::{CommonArgs, PipelineData, open_chunks};
|
||||
|
||||
#[derive(Args)]
|
||||
pub struct PartitionArgs {
|
||||
/// Output partition directory
|
||||
#[arg(short, long)]
|
||||
pub output: PathBuf,
|
||||
|
||||
/// Input files or directories (FASTA/FASTQ, optionally gzip-compressed)
|
||||
#[arg(num_args = 1..)]
|
||||
pub inputs: Vec<String>,
|
||||
|
||||
/// k-mer size
|
||||
#[arg(short, long, default_value_t = 31)]
|
||||
pub kmer_size: usize,
|
||||
|
||||
/// Minimizer size
|
||||
#[arg(short, long, default_value_t = 11)]
|
||||
pub minimizer_size: usize,
|
||||
|
||||
/// Entropy threshold (k-mers with score ≤ theta are rejected)
|
||||
#[arg(long, default_value_t = 0.7)]
|
||||
pub theta: f64,
|
||||
|
||||
/// Maximum sub-word size for entropy computation
|
||||
#[arg(long, default_value_t = 6)]
|
||||
pub level_max: usize,
|
||||
|
||||
/// Number of bits to encode partitions (allows up to 2^partition_bits partitions)
|
||||
#[arg(short, long, default_value_t = 8)]
|
||||
pub partition_bits: usize,
|
||||
|
||||
/// Overwrite output directory if it already exists
|
||||
#[arg(long, default_value_t = false)]
|
||||
pub force: bool,
|
||||
|
||||
/// Number of worker threads
|
||||
#[arg(
|
||||
short = 'T',
|
||||
long,
|
||||
default_value_t = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1))]
|
||||
pub threads: usize,
|
||||
#[command(flatten)]
|
||||
pub common: CommonArgs,
|
||||
}
|
||||
|
||||
enum PipelineData {
|
||||
Path(PathBuf),
|
||||
RawChunk(Rope),
|
||||
NormChunk(Rope),
|
||||
Batch(Vec<SuperKmer>),
|
||||
}
|
||||
|
||||
// SAFETY: Rope contains Cell<u8> which is !Sync, but pipeline ownership transfers
|
||||
// exclusively through channels — no item is ever shared across threads.
|
||||
unsafe impl Send for PipelineData {}
|
||||
unsafe impl Sync for PipelineData {}
|
||||
|
||||
// ── Stage functions ───────────────────────────────────────────────────────────
|
||||
|
||||
fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
|
||||
let path_str = path
|
||||
.to_str()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF-8 path"))?;
|
||||
let iter = obiread::read_sequence_chunks(path_str)?;
|
||||
Ok(iter.filter_map(|r| match r {
|
||||
Ok(rope) => Some(rope),
|
||||
Err(e) => {
|
||||
eprintln!("chunk read error: {e}");
|
||||
None
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn normalize(rope: Rope, k: usize) -> io::Result<Rope> {
|
||||
obiread::normalize_sequence_chunk(rope, k)
|
||||
}
|
||||
|
||||
fn build_superkmers(
|
||||
rope: Rope,
|
||||
k: usize,
|
||||
m: usize,
|
||||
level_max: usize,
|
||||
theta: f64,
|
||||
) -> Vec<SuperKmer> {
|
||||
SuperKmerIter::new(&rope, k, m, level_max, theta).collect()
|
||||
}
|
||||
|
||||
fn write_batch(mut batch: Vec<SuperKmer>, kp: &Mutex<KmerPartition>) -> io::Result<()> {
|
||||
kp.lock()
|
||||
.unwrap()
|
||||
.write_batch(&mut batch)
|
||||
.map_err(|e| io::Error::other(e))
|
||||
.map_err(io::Error::other)
|
||||
}
|
||||
|
||||
// ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
pub fn run(args: PartitionArgs) {
|
||||
let k = args.kmer_size;
|
||||
let m = args.minimizer_size;
|
||||
let theta = args.theta;
|
||||
let level_max = args.level_max;
|
||||
let n_workers = args.threads.max(1);
|
||||
let k = args.common.kmer_size;
|
||||
let m = args.common.minimizer_size;
|
||||
let theta = args.common.theta;
|
||||
let level_max = args.common.level_max;
|
||||
let n_workers = args.common.threads.max(1);
|
||||
|
||||
let kp = KmerPartition::create(&args.output, args.partition_bits, k, m, args.force)
|
||||
let kp = KmerPartition::create(&args.output, args.common.partition_bits, k, m, args.force)
|
||||
.unwrap_or_else(|e| {
|
||||
eprintln!("error: {e}");
|
||||
std::process::exit(1)
|
||||
@@ -119,16 +49,16 @@ pub fn run(args: PartitionArgs) {
|
||||
let kp = Arc::new(Mutex::new(kp));
|
||||
let kp_sink = Arc::clone(&kp);
|
||||
|
||||
let paths = args.inputs.iter().map(PathBuf::from).collect();
|
||||
let paths = args.common.inputs.iter().map(PathBuf::from).collect();
|
||||
let path_source = obiread::PathIter::new(paths);
|
||||
|
||||
let pipeline = make_pipeline! {
|
||||
PipelineData,
|
||||
source path_source => Path,
|
||||
||? open_chunks : Path => RawChunk,
|
||||
|? { move |rope| normalize(rope, k) } : RawChunk => NormChunk,
|
||||
| { move |rope| build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch,
|
||||
sink? { move |batch| write_batch(batch, &kp_sink) } @ Batch,
|
||||
source path_source => Path,
|
||||
||? open_chunks : Path => RawChunk,
|
||||
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
||||
| { move |rope| obiskbuilder::build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch,
|
||||
sink? { move |batch| write_batch(batch, &kp_sink) } @ Batch,
|
||||
};
|
||||
|
||||
WorkerPool::new(pipeline, n_workers, 1).run();
|
||||
|
||||
@@ -4,94 +4,19 @@ use std::sync::{Arc, Mutex};
|
||||
|
||||
use clap::Args;
|
||||
use obifastwrite::write_scatter;
|
||||
use obikrope::Rope;
|
||||
use obikseq::superkmer::SuperKmer;
|
||||
use obipipeline::{WorkerPool, make_pipeline};
|
||||
use obiskbuilder::SuperKmerIter;
|
||||
|
||||
use crate::cli::{CommonArgs, PipelineData, open_chunks};
|
||||
|
||||
#[derive(Args)]
|
||||
pub struct SuperkmerArgs {
|
||||
/// Input files or directories (FASTA/FASTQ, optionally gzip-compressed)
|
||||
#[arg(num_args = 1..)]
|
||||
pub inputs: Vec<String>,
|
||||
|
||||
/// k-mer size
|
||||
#[arg(short, long, default_value_t = 31)]
|
||||
pub kmer_size: usize,
|
||||
|
||||
/// Minimizer size
|
||||
#[arg(short, long, default_value_t = 11)]
|
||||
pub minimizer_size: usize,
|
||||
|
||||
/// Entropy threshold (k-mers with score ≤ theta are rejected)
|
||||
#[arg(long, default_value_t = 0.7)]
|
||||
pub theta: f64,
|
||||
|
||||
/// Maximum sub-word size for entropy computation
|
||||
#[arg(long, default_value_t = 6)]
|
||||
pub level_max: usize,
|
||||
|
||||
/// Number of bits to encode partitions (allows up to 2^partition_bits partitions)
|
||||
#[arg(short, long, default_value_t = 8)]
|
||||
pub partition_bits: usize,
|
||||
|
||||
/// Number of worker threads
|
||||
#[arg(
|
||||
short = 'T',
|
||||
long,
|
||||
default_value_t = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1))]
|
||||
pub threads: usize,
|
||||
#[command(flatten)]
|
||||
pub common: CommonArgs,
|
||||
}
|
||||
|
||||
enum PipelineData {
|
||||
Path(PathBuf),
|
||||
RawChunk(Rope),
|
||||
NormChunk(Rope),
|
||||
Batch(Vec<SuperKmer>),
|
||||
}
|
||||
|
||||
// SAFETY: Rope contains Cell<u8> which is !Sync, but pipeline ownership transfers
|
||||
// exclusively through channels — no item is ever shared across threads.
|
||||
unsafe impl Send for PipelineData {}
|
||||
unsafe impl Sync for PipelineData {}
|
||||
|
||||
// ── Stage functions ───────────────────────────────────────────────────────────
|
||||
|
||||
/// Opens a sequence file and returns an iterator over its raw Rope chunks.
|
||||
/// Chunk-level I/O errors are logged and skipped.
|
||||
fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
|
||||
let path_str = path
|
||||
.to_str()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF-8 path"))?;
|
||||
let iter = obiread::read_sequence_chunks(path_str)?;
|
||||
Ok(iter.filter_map(|r| match r {
|
||||
Ok(rope) => Some(rope),
|
||||
Err(e) => {
|
||||
eprintln!("chunk read error: {e}");
|
||||
None
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
/// Normalises a raw sequence chunk (FASTA or FASTQ) into a compact ACGT/NUL rope.
|
||||
fn normalize(rope: Rope, k: usize) -> io::Result<Rope> {
|
||||
obiread::normalize_sequence_chunk(rope, k)
|
||||
}
|
||||
|
||||
/// Extracts all super-kmers from a normalised rope.
|
||||
fn build_superkmers(
|
||||
rope: Rope,
|
||||
k: usize,
|
||||
m: usize,
|
||||
level_max: usize,
|
||||
theta: f64,
|
||||
) -> Vec<SuperKmer> {
|
||||
SuperKmerIter::new(&rope, k, m, level_max, theta).collect()
|
||||
}
|
||||
|
||||
/// Writes a batch of super-kmers to the output sink.
|
||||
fn write_batch(
|
||||
batch: Vec<SuperKmer>,
|
||||
out: &Mutex<BufWriter<io::Stdout>>,
|
||||
@@ -104,7 +29,7 @@ fn write_batch(
|
||||
for sk in batch {
|
||||
let minimizer = sk
|
||||
.kmer(sk.minimizer_pos() as usize, m)
|
||||
.map_err(|e| std::io::Error::other(e))?
|
||||
.map_err(io::Error::other)?
|
||||
.canonical(m);
|
||||
let partition = (minimizer.hash(m) & partition_mask) as usize;
|
||||
write_scatter(&sk, &mut *w, k, m, partition, minimizer)?;
|
||||
@@ -112,26 +37,17 @@ fn write_batch(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn mix64(x: u64) -> u64 {
|
||||
let x = x ^ (x >> 30);
|
||||
let x = x.wrapping_mul(0xbf58476d1ce4e5b9);
|
||||
let x = x ^ (x >> 27);
|
||||
let x = x.wrapping_mul(0x94d049bb133111eb);
|
||||
x ^ (x >> 31)
|
||||
}
|
||||
|
||||
// ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
pub fn run(args: SuperkmerArgs) {
|
||||
let k = args.kmer_size;
|
||||
let m = args.minimizer_size;
|
||||
let theta = args.theta;
|
||||
let level_max = args.level_max;
|
||||
let partition_bits = args.partition_bits;
|
||||
let n_workers = args.threads.max(1);
|
||||
let k = args.common.kmer_size;
|
||||
let m = args.common.minimizer_size;
|
||||
let theta = args.common.theta;
|
||||
let level_max = args.common.level_max;
|
||||
let partition_bits = args.common.partition_bits;
|
||||
let n_workers = args.common.threads.max(1);
|
||||
|
||||
let paths = args.inputs.iter().map(PathBuf::from).collect();
|
||||
let paths = args.common.inputs.iter().map(PathBuf::from).collect();
|
||||
let path_source = obiread::PathIter::new(paths);
|
||||
|
||||
let out = Arc::new(Mutex::new(BufWriter::new(io::stdout())));
|
||||
@@ -139,11 +55,11 @@ pub fn run(args: SuperkmerArgs) {
|
||||
|
||||
let pipeline = make_pipeline! {
|
||||
PipelineData,
|
||||
source path_source => Path,
|
||||
||? open_chunks : Path => RawChunk,
|
||||
|? { move |rope| normalize(rope, k) } : RawChunk => NormChunk,
|
||||
| { move |rope| build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch,
|
||||
sink? { move |batch| write_batch(batch, &out_sink, partition_bits, k, m) } @ Batch,
|
||||
source path_source => Path,
|
||||
||? open_chunks : Path => RawChunk,
|
||||
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
||||
| { move |rope| obiskbuilder::build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch,
|
||||
sink? { move |batch| write_batch(batch, &out_sink, partition_bits, k, m) } @ Batch,
|
||||
};
|
||||
|
||||
WorkerPool::new(pipeline, n_workers, 1).run();
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
mod cli;
|
||||
mod cmd;
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
@@ -22,7 +23,7 @@ enum Commands {
|
||||
|
||||
fn main() {
|
||||
fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
|
||||
.with_writer(std::io::stderr)
|
||||
.init();
|
||||
|
||||
|
||||
@@ -20,13 +20,13 @@ use rayon::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
const META_FILENAME: &str = "partition.meta";
|
||||
const SK_EXT: &str = "skmer.zst";
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct PartitionMeta {
|
||||
n_bits: usize,
|
||||
kmer_size: usize,
|
||||
minimizer_size: usize,
|
||||
format: String,
|
||||
level: u32,
|
||||
}
|
||||
|
||||
@@ -37,7 +37,6 @@ pub struct KmerPartition {
|
||||
kmer_size: usize,
|
||||
minimizer_size: usize,
|
||||
writers: Vec<Option<SKFileWriter>>,
|
||||
format: Format,
|
||||
level: Level,
|
||||
closed: bool,
|
||||
}
|
||||
@@ -50,15 +49,7 @@ impl KmerPartition {
|
||||
minimizer_size: usize,
|
||||
force: bool,
|
||||
) -> SKResult<Self> {
|
||||
Self::create_with(
|
||||
path,
|
||||
n_bits,
|
||||
kmer_size,
|
||||
minimizer_size,
|
||||
Format::Zstd,
|
||||
Level::Three,
|
||||
force,
|
||||
)
|
||||
Self::create_with(path, n_bits, kmer_size, minimizer_size, Level::Three, force)
|
||||
}
|
||||
|
||||
pub fn create_with<P: AsRef<Path>>(
|
||||
@@ -66,7 +57,6 @@ impl KmerPartition {
|
||||
n_bits: usize,
|
||||
kmer_size: usize,
|
||||
minimizer_size: usize,
|
||||
format: Format,
|
||||
level: Level,
|
||||
force: bool,
|
||||
) -> SKResult<Self> {
|
||||
@@ -95,7 +85,6 @@ impl KmerPartition {
|
||||
kmer_size,
|
||||
minimizer_size,
|
||||
writers,
|
||||
format,
|
||||
level,
|
||||
closed: false,
|
||||
};
|
||||
@@ -116,13 +105,6 @@ impl KmerPartition {
|
||||
let meta: PartitionMeta = serde_json::from_reader(fs::File::open(&meta_path)?)
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
let format = match meta.format.as_str() {
|
||||
"gzip" => Format::Gzip,
|
||||
"bzip2" => Format::Bzip,
|
||||
"lzma" => Format::Lzma,
|
||||
"zstd" => Format::Zstd,
|
||||
_ => Format::No,
|
||||
};
|
||||
let level = level_from_u32(meta.level);
|
||||
let n_partitions = 1usize << meta.n_bits;
|
||||
let writers = (0..n_partitions).map(|_| None).collect();
|
||||
@@ -133,7 +115,6 @@ impl KmerPartition {
|
||||
kmer_size: meta.kmer_size,
|
||||
minimizer_size: meta.minimizer_size,
|
||||
writers,
|
||||
format,
|
||||
level,
|
||||
closed: true, // read-only: writing is not allowed on an opened partition
|
||||
})
|
||||
@@ -203,9 +184,7 @@ impl KmerPartition {
|
||||
/// partition). Higher values reduce per-temp-file memory at the cost of
|
||||
/// more temporary file descriptors — all managed by the global fd pool.
|
||||
pub fn dereplicate(&self) -> SKResult<()> {
|
||||
let format = self.format;
|
||||
let level = self.level;
|
||||
let ext = format_ext(format);
|
||||
let root = &self.root_path;
|
||||
let available = System::new_all().available_memory();
|
||||
let n_threads = rayon::current_num_threads().max(1) as u64;
|
||||
@@ -218,9 +197,9 @@ impl KmerPartition {
|
||||
if !dir.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
let raw_path = dir.join(format!("raw.{ext}"));
|
||||
let raw_path = dir.join(format!("raw.{SK_EXT}"));
|
||||
let n_buckets = optimal_buckets(&raw_path, available_per_thread);
|
||||
dereplicate_partition(&dir, ext, format, level, n_buckets)
|
||||
dereplicate_partition(&dir, level, n_buckets)
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -241,7 +220,6 @@ impl KmerPartition {
|
||||
/// Partitions are processed in parallel via Rayon (one task per thread).
|
||||
/// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously.
|
||||
pub fn count_kmer(&self) -> SKResult<()> {
|
||||
let ext = format_ext(self.format);
|
||||
let root = &self.root_path;
|
||||
let k = self.kmer_size;
|
||||
|
||||
@@ -249,7 +227,7 @@ impl KmerPartition {
|
||||
.into_par_iter()
|
||||
.map(|i| {
|
||||
let dir = root.join(format!("part_{:05}", i));
|
||||
let dedup_path = dir.join(format!("dereplicated.{ext}"));
|
||||
let dedup_path = dir.join(format!("dereplicated.{SK_EXT}"));
|
||||
if !dedup_path.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -320,14 +298,6 @@ impl KmerPartition {
|
||||
n_bits,
|
||||
kmer_size: self.kmer_size,
|
||||
minimizer_size: self.minimizer_size,
|
||||
format: match self.format {
|
||||
Format::Gzip => "gzip",
|
||||
Format::Bzip => "bzip2",
|
||||
Format::Lzma => "lzma",
|
||||
Format::Zstd => "zstd",
|
||||
Format::No => "none",
|
||||
}
|
||||
.to_owned(),
|
||||
level: u32::from(self.level),
|
||||
};
|
||||
let f = fs::File::create(self.root_path.join(META_FILENAME))?;
|
||||
@@ -339,9 +309,8 @@ impl KmerPartition {
|
||||
if self.writers[partition].is_none() {
|
||||
let dir = self.root_path.join(format!("part_{:05}", partition));
|
||||
fs::create_dir_all(&dir)?;
|
||||
let ext = format_ext(self.format);
|
||||
let file_path = dir.join(format!("raw.{ext}"));
|
||||
let writer = SKFileWriter::create_with(file_path, self.format, self.level)?;
|
||||
let file_path = dir.join(format!("raw.{SK_EXT}"));
|
||||
let writer = SKFileWriter::create_with(file_path, Format::Zstd, self.level)?;
|
||||
self.writers[partition] = Some(writer);
|
||||
}
|
||||
Ok(self.writers[partition].as_mut().unwrap())
|
||||
@@ -408,34 +377,19 @@ fn level_from_u32(n: u32) -> Level {
|
||||
}
|
||||
}
|
||||
|
||||
fn format_ext(format: Format) -> &'static str {
|
||||
match format {
|
||||
Format::Gzip => "skmer.gz",
|
||||
Format::Bzip => "skmer.bz2",
|
||||
Format::Lzma => "skmer.xz",
|
||||
Format::Zstd => "skmer.zst",
|
||||
Format::No => "skmer",
|
||||
}
|
||||
}
|
||||
|
||||
/// Maximum value that fits in the 24-bit COUNT field of a SuperKmer header.
|
||||
const MAX_SK_COUNT: u64 = (1 << 24) - 1;
|
||||
|
||||
/// Deduplicate one partition directory in place (two-phase split + merge).
|
||||
fn dereplicate_partition(
|
||||
dir: &Path,
|
||||
ext: &str,
|
||||
format: Format,
|
||||
level: Level,
|
||||
n_temp: usize,
|
||||
) -> SKResult<()> {
|
||||
let raw_path = dir.join(format!("raw.{ext}"));
|
||||
fn dereplicate_partition(dir: &Path, level: Level, n_temp: usize) -> SKResult<()> {
|
||||
let raw_path = dir.join(format!("raw.{SK_EXT}"));
|
||||
if !raw_path.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let out_path = dir.join(format!("dereplicated.{ext}"));
|
||||
let mut writer = SKFileWriter::create_with(&out_path, format, level)?;
|
||||
let out_path = dir.join(format!("dereplicated.{SK_EXT}"));
|
||||
let mut writer = SKFileWriter::create_with(&out_path, Format::Zstd, level)?;
|
||||
|
||||
if n_temp == 1 {
|
||||
// ── Direct path: partition fits in memory, no split needed ────────────
|
||||
@@ -446,13 +400,13 @@ fn dereplicate_partition(
|
||||
// ── Phase 1: split raw file into temp buckets ─────────────────────────
|
||||
let temp_mask = (n_temp as u64) - 1;
|
||||
let temp_paths: Vec<PathBuf> = (0..n_temp)
|
||||
.map(|j| dir.join(format!("temp_{j:04}.{ext}")))
|
||||
.map(|j| dir.join(format!("temp_{j:04}.{SK_EXT}")))
|
||||
.collect();
|
||||
|
||||
{
|
||||
let mut writers: Vec<SKFileWriter> = temp_paths
|
||||
.iter()
|
||||
.map(|p| SKFileWriter::create_with(p, format, level))
|
||||
.map(|p| SKFileWriter::create_with(p, Format::Zstd, level))
|
||||
.collect::<SKResult<_>>()?;
|
||||
|
||||
let mut reader = SKFileReader::open(&raw_path)?;
|
||||
@@ -530,12 +484,8 @@ fn count_partition(dir: &Path, dedup_path: &Path, k: usize) -> SKResult<()> {
|
||||
let mut reader = SKFileReader::open(dedup_path)?;
|
||||
while let Some(sk) = reader.read()? {
|
||||
pass1_superkmers += 1;
|
||||
let seql = sk.seql();
|
||||
if seql < k {
|
||||
continue;
|
||||
}
|
||||
for pos in 0..=(seql - k) {
|
||||
seen.insert(sk.kmer(pos, k).map_err(io::Error::other)?.canonical(k));
|
||||
for kmer in sk.iter_canonical_kmers(k) {
|
||||
seen.insert(kmer);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -583,8 +533,7 @@ fn count_partition(dir: &Path, dedup_path: &Path, k: usize) -> SKResult<()> {
|
||||
continue;
|
||||
}
|
||||
pass2_count_sum += sk_count as u64;
|
||||
for pos in 0..=(seql - k) {
|
||||
let kmer = sk.kmer(pos, k).map_err(io::Error::other)?.canonical(k);
|
||||
for kmer in sk.iter_canonical_kmers(k) {
|
||||
if let Some(idx) = mphf.get(&kmer) {
|
||||
counts[idx as usize] = counts[idx as usize].saturating_add(sk_count);
|
||||
pass2_kmer_hits += 1;
|
||||
|
||||
@@ -27,7 +27,7 @@
|
||||
//! ```
|
||||
//! use obikrope::{Rope, RopeCursor};
|
||||
//!
|
||||
//! let mut rope = Rope::new();
|
||||
//! let mut rope = Rope::new(None);
|
||||
//! rope.push(b"ACGT".to_vec());
|
||||
//!
|
||||
//! let cursor = rope.fw_cursor();
|
||||
|
||||
@@ -333,8 +333,6 @@ impl SuperKmer {
|
||||
|
||||
/// Extract the canonical kmer of length k starting at nucleotide position i (0-based).
|
||||
///
|
||||
/// Equivalent to `self.kmer(i, k)?.canonical(k)` but avoids the redundant `revcomp` call
|
||||
/// when the super-kmer is already in canonical form (which is the normal case).
|
||||
/// Returns an error if k is invalid (0 or > 32) or if position i + k exceeds the sequence length.
|
||||
pub fn canonical_kmer(&self, i: usize, k: usize) -> Result<Kmer, KmerError> {
|
||||
Ok(self.kmer(i, k)?.canonical(k))
|
||||
@@ -367,6 +365,16 @@ impl SuperKmer {
|
||||
true
|
||||
}
|
||||
|
||||
/// Iterate over all kmers of length `k` in order, yielding each as a left-aligned [`Kmer`].
|
||||
pub fn iter_kmers(&self, k: usize) -> impl Iterator<Item = Kmer> + '_ {
|
||||
SKKmerIter::new(self, k)
|
||||
}
|
||||
|
||||
/// Iterate over all canonical kmers of length `k` in order.
|
||||
pub fn iter_canonical_kmers(&self, k: usize) -> impl Iterator<Item = Kmer> + '_ {
|
||||
self.iter_kmers(k).map(move |km| km.canonical(k))
|
||||
}
|
||||
|
||||
/// Returns the XXH3 hash of the super-kmer sequence.
|
||||
pub fn hash(&self) -> u64 {
|
||||
if self.is_canonical() {
|
||||
@@ -379,6 +387,53 @@ impl SuperKmer {
|
||||
}
|
||||
}
|
||||
|
||||
struct SKKmerIter<'a> {
|
||||
skmer: &'a SuperKmer,
|
||||
mask: u64,
|
||||
lshift: usize,
|
||||
current: u64,
|
||||
pos: usize,
|
||||
max_pos: usize,
|
||||
}
|
||||
|
||||
impl<'a> SKKmerIter<'a> {
|
||||
fn new(skmer: &'a SuperKmer, k: usize) -> Self {
|
||||
let seql = skmer.seql();
|
||||
let lshift = 64 - k * 2;
|
||||
let mask = ((!0u128) << (lshift + 2)) as u64;
|
||||
Self {
|
||||
skmer,
|
||||
mask,
|
||||
lshift,
|
||||
current: if seql >= k { skmer.kmer(0, k).unwrap().raw() } else { 0 },
|
||||
pos: k,
|
||||
max_pos: seql,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for SKKmerIter<'a> {
|
||||
type Item = Kmer;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.pos > self.max_pos {
|
||||
return None;
|
||||
}
|
||||
// Emit current kmer first, then slide the window forward.
|
||||
let result = Kmer::from_raw(self.current);
|
||||
if self.pos < self.max_pos {
|
||||
let byte_pos = self.pos / 4;
|
||||
// Nucleotide at position r within its byte occupies bits 7-2r (MSB) and 6-2r (LSB).
|
||||
// Extract right-aligned, then place at lshift.
|
||||
let inner_shift = 6 - 2 * (self.pos & 3);
|
||||
let nuc = (((self.skmer.seq[byte_pos] >> inner_shift) & 3) as u64) << self.lshift;
|
||||
self.current = ((self.current << 2) & self.mask) | nuc;
|
||||
}
|
||||
self.pos += 1;
|
||||
Some(result)
|
||||
}
|
||||
}
|
||||
|
||||
// ── helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
fn complement(base: u8) -> u8 {
|
||||
@@ -755,4 +810,121 @@ mod tests {
|
||||
sk.set_minimizer_pos(3);
|
||||
assert_eq!(sk.to_ascii(), ascii);
|
||||
}
|
||||
|
||||
// ── iter_kmers ────────────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn iter_kmers_count() {
|
||||
let ascii = b"ACGTACGTACGT";
|
||||
let sk = SuperKmer::from_ascii(ascii);
|
||||
for k in [1usize, 3, 4, 5, 8, 12] {
|
||||
let n = sk.iter_kmers(k).count();
|
||||
assert_eq!(n, ascii.len() - k + 1, "count mismatch for k={k}");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iter_kmers_first_is_kmer_0() {
|
||||
let ascii = b"ACGTACGT";
|
||||
let sk = SuperKmer::from_ascii(ascii);
|
||||
for k in 1..=ascii.len() {
|
||||
let first = sk.iter_kmers(k).next().unwrap();
|
||||
assert_eq!(first, sk.kmer(0, k).unwrap(), "k={k}");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iter_kmers_matches_kmer_at_each_position() {
|
||||
let ascii = b"ACGTACGTACGT";
|
||||
let sk = SuperKmer::from_ascii(ascii);
|
||||
let k = 4;
|
||||
let kmers: Vec<Kmer> = sk.iter_kmers(k).collect();
|
||||
assert_eq!(kmers.len(), ascii.len() - k + 1);
|
||||
for (i, &km) in kmers.iter().enumerate() {
|
||||
assert_eq!(km, sk.kmer(i, k).unwrap(), "mismatch at pos {i}");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iter_kmers_single_when_seql_eq_k() {
|
||||
let ascii = b"ACGTACGT";
|
||||
let sk = SuperKmer::from_ascii(ascii);
|
||||
let k = ascii.len();
|
||||
let kmers: Vec<Kmer> = sk.iter_kmers(k).collect();
|
||||
assert_eq!(kmers.len(), 1);
|
||||
assert_eq!(kmers[0], sk.kmer(0, k).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iter_kmers_two_when_seql_eq_k_plus_one() {
|
||||
let ascii = b"ACGTACGT";
|
||||
let sk = SuperKmer::from_ascii(ascii);
|
||||
let k = ascii.len() - 1;
|
||||
let kmers: Vec<Kmer> = sk.iter_kmers(k).collect();
|
||||
assert_eq!(kmers.len(), 2);
|
||||
assert_eq!(kmers[0], sk.kmer(0, k).unwrap());
|
||||
assert_eq!(kmers[1], sk.kmer(1, k).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iter_kmers_all_k_values() {
|
||||
// For every valid k, each yielded kmer must match kmer(i, k).
|
||||
let ascii = b"ACGTACGTACGT";
|
||||
let sk = SuperKmer::from_ascii(ascii);
|
||||
let seql = ascii.len();
|
||||
for k in 1..=seql {
|
||||
let kmers: Vec<Kmer> = sk.iter_kmers(k).collect();
|
||||
assert_eq!(kmers.len(), seql - k + 1, "k={k}");
|
||||
for (i, &km) in kmers.iter().enumerate() {
|
||||
assert_eq!(km, sk.kmer(i, k).unwrap(), "k={k}, pos={i}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iter_kmers_crosses_byte_boundary() {
|
||||
// Positions 3→4 and 7→8 cross a 4-nucleotide byte boundary.
|
||||
let ascii = b"ACGTACGTACGT";
|
||||
let sk = SuperKmer::from_ascii(ascii);
|
||||
let k = 3;
|
||||
let kmers: Vec<Kmer> = sk.iter_kmers(k).collect();
|
||||
for boundary in [3usize, 4, 7, 8] {
|
||||
if boundary + 1 < kmers.len() {
|
||||
assert_eq!(
|
||||
kmers[boundary],
|
||||
sk.kmer(boundary, k).unwrap(),
|
||||
"pos={boundary}"
|
||||
);
|
||||
assert_eq!(
|
||||
kmers[boundary + 1],
|
||||
sk.kmer(boundary + 1, k).unwrap(),
|
||||
"pos={}",
|
||||
boundary + 1
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iter_kmers_k1_yields_all_nucleotides() {
|
||||
let ascii = b"ACGT";
|
||||
let sk = SuperKmer::from_ascii(ascii);
|
||||
let kmers: Vec<Kmer> = sk.iter_kmers(1).collect();
|
||||
assert_eq!(kmers.len(), 4);
|
||||
for (i, &km) in kmers.iter().enumerate() {
|
||||
assert_eq!(km, sk.kmer(i, 1).unwrap(), "pos={i}");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iter_kmers_long_sequence() {
|
||||
let ascii = make_seq(20);
|
||||
let sk = SuperKmer::from_ascii(&ascii);
|
||||
let k = 7;
|
||||
let kmers: Vec<Kmer> = sk.iter_kmers(k).collect();
|
||||
assert_eq!(kmers.len(), ascii.len() - k + 1);
|
||||
for (i, &km) in kmers.iter().enumerate() {
|
||||
assert_eq!(km, sk.kmer(i, k).unwrap(), "pos={i}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,39 +168,43 @@ mod tests {
|
||||
.collect()
|
||||
}
|
||||
|
||||
// k=11, m=5 — valeurs minimales du projet (k ∈ [11,31])
|
||||
const K: usize = 11;
|
||||
const M: usize = 5;
|
||||
|
||||
#[test]
|
||||
fn single_segment_one_superkmer() {
|
||||
let out = run_nofilter(b"ACGTACGT\x00", 4, 2);
|
||||
let out = run_nofilter(b"ACGTACGTACGTACGTACGT\x00", K, M);
|
||||
assert!(!out.is_empty());
|
||||
let total: Vec<u8> = out.into_iter().flatten().collect();
|
||||
assert!(total.len() >= 4);
|
||||
assert!(total.len() >= K);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn segment_shorter_than_k_emits_nothing() {
|
||||
let out = run_nofilter(b"ACG\x00", 4, 2);
|
||||
let out = run_nofilter(b"ACGTACGT\x00", K, M);
|
||||
assert_eq!(out, Vec::<Vec<u8>>::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_input_emits_nothing() {
|
||||
let out = run_nofilter(b"", 4, 2);
|
||||
let out = run_nofilter(b"", K, M);
|
||||
assert_eq!(out, Vec::<Vec<u8>>::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn two_segments_both_emitted() {
|
||||
let out = run_nofilter(b"ACGTACGT\x00TTTTTTTT\x00", 4, 2);
|
||||
let out = run_nofilter(b"ACGTACGTACGTACGT\x00TGCATGCATGCATGCA\x00", K, M);
|
||||
assert!(!out.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn low_complexity_kmer_is_rejected() {
|
||||
let out_pass = run_nofilter(b"AAAAAAAAACGT\x00", 4, 2);
|
||||
let out_pass = run_nofilter(b"AAAAAAAAAAAACGTACGTACGT\x00", K, M);
|
||||
assert!(!out_pass.is_empty());
|
||||
|
||||
let rope = make_rope(b"AAAAAAAA\x00");
|
||||
let out_reject: Vec<Vec<u8>> = SuperKmerIter::new(&rope, 4, 2, 6, 0.9)
|
||||
let rope = make_rope(b"AAAAAAAAAAAAAAAAAAAA\x00");
|
||||
let out_reject: Vec<Vec<u8>> = SuperKmerIter::new(&rope, K, M, 6, 0.9)
|
||||
.map(|sk| sk.to_ascii())
|
||||
.collect();
|
||||
assert!(out_reject.is_empty());
|
||||
@@ -208,12 +212,12 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn multi_slice_rope() {
|
||||
let data = b"ACGTACGTACGT\x00";
|
||||
let data = b"ACGTACGTACGTACGTACGT\x00";
|
||||
let mid = data.len() / 2;
|
||||
let mut rope = Rope::new(None);
|
||||
rope.push(data[..mid].to_vec());
|
||||
rope.push(data[mid..].to_vec());
|
||||
let out: Vec<Vec<u8>> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0)
|
||||
let out: Vec<Vec<u8>> = SuperKmerIter::new(&rope, K, M, 1, 0.0)
|
||||
.map(|sk| sk.to_ascii())
|
||||
.collect();
|
||||
assert!(!out.is_empty());
|
||||
@@ -221,8 +225,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn yields_minimizer_value() {
|
||||
let rope = make_rope(b"ACGTACGT\x00");
|
||||
let results: Vec<SuperKmer> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0).collect();
|
||||
let rope = make_rope(b"ACGTACGTACGTACGTACGT\x00");
|
||||
let results: Vec<SuperKmer> = SuperKmerIter::new(&rope, K, M, 1, 0.0).collect();
|
||||
assert!(!results.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,3 +14,11 @@ pub(crate) mod rolling_stat;
|
||||
|
||||
pub use iter::SuperKmerIter;
|
||||
pub use scratch::SuperKmerScratch;
|
||||
|
||||
use obikrope::Rope;
|
||||
use obikseq::superkmer::SuperKmer;
|
||||
|
||||
/// Collect all super-kmers from a normalised rope chunk.
|
||||
pub fn build_superkmers(rope: Rope, k: usize, m: usize, level_max: usize, theta: f64) -> Vec<SuperKmer> {
|
||||
SuperKmerIter::new(&rope, k, m, level_max, theta).collect()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user