diff --git a/.gitignore b/.gitignore index 5381ab2..a9d6044 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ data-stress *.pb *.json *.bin +*.bin +*.json diff --git a/src/obikmer/src/cli.rs b/src/obikmer/src/cli.rs new file mode 100644 index 0000000..8876b4d --- /dev/null +++ b/src/obikmer/src/cli.rs @@ -0,0 +1,75 @@ +use std::io; +use std::path::PathBuf; + +use clap::Args; +use obikrope::Rope; +use obikseq::superkmer::SuperKmer; + +// ── Shared arguments ────────────────────────────────────────────────────────── + +#[derive(Args)] +pub struct CommonArgs { + /// Input files or directories (FASTA/FASTQ, optionally gzip-compressed) + #[arg(num_args = 1..)] + pub inputs: Vec, + + /// k-mer size + #[arg(short, long, default_value_t = 31)] + pub kmer_size: usize, + + /// Minimizer size + #[arg(short, long, default_value_t = 11)] + pub minimizer_size: usize, + + /// Entropy threshold (k-mers with score ≤ theta are rejected) + #[arg(long, default_value_t = 0.7)] + pub theta: f64, + + /// Maximum sub-word size for entropy computation + #[arg(long, default_value_t = 6)] + pub level_max: usize, + + /// Number of bits to encode partitions (allows up to 2^partition_bits partitions) + #[arg(short, long, default_value_t = 8)] + pub partition_bits: usize, + + /// Number of worker threads + #[arg( + short = 'T', + long, + default_value_t = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + )] + pub threads: usize, +} + +// ── Pipeline data carrier ───────────────────────────────────────────────────── + +pub enum PipelineData { + Path(PathBuf), + RawChunk(Rope), + NormChunk(Rope), + Batch(Vec), +} + +// SAFETY: Rope contains Cell which is !Sync, but pipeline ownership transfers +// exclusively through channels — no item is ever shared across threads. +unsafe impl Send for PipelineData {} +unsafe impl Sync for PipelineData {} + +// ── I/O plumbing ────────────────────────────────────────────────────────────── + +pub fn open_chunks(path: PathBuf) -> io::Result> { + let path_str = path + .to_str() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF-8 path"))?; + let iter = obiread::read_sequence_chunks(path_str)?; + Ok(iter.filter_map(|r| match r { + Ok(rope) => Some(rope), + Err(e) => { + eprintln!("chunk read error: {e}"); + None + } + })) +} diff --git a/src/obikmer/src/cmd/partition.rs b/src/obikmer/src/cmd/partition.rs index 84d28f4..5a8de05 100644 --- a/src/obikmer/src/cmd/partition.rs +++ b/src/obikmer/src/cmd/partition.rs @@ -1,117 +1,47 @@ use clap::Args; use obikpartitionner::KmerPartition; -use obikrope::Rope; use obikseq::superkmer::SuperKmer; use obipipeline::{WorkerPool, make_pipeline}; -use obiskbuilder::SuperKmerIter; use std::io; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use tracing::info; +use crate::cli::{CommonArgs, PipelineData, open_chunks}; + #[derive(Args)] pub struct PartitionArgs { /// Output partition directory #[arg(short, long)] pub output: PathBuf, - /// Input files or directories (FASTA/FASTQ, optionally gzip-compressed) - #[arg(num_args = 1..)] - pub inputs: Vec, - - /// k-mer size - #[arg(short, long, default_value_t = 31)] - pub kmer_size: usize, - - /// Minimizer size - #[arg(short, long, default_value_t = 11)] - pub minimizer_size: usize, - - /// Entropy threshold (k-mers with score ≤ theta are rejected) - #[arg(long, default_value_t = 0.7)] - pub theta: f64, - - /// Maximum sub-word size for entropy computation - #[arg(long, default_value_t = 6)] - pub level_max: usize, - - /// Number of bits to encode partitions (allows up to 2^partition_bits partitions) - #[arg(short, long, default_value_t = 8)] - pub partition_bits: usize, - /// Overwrite output directory if it already exists #[arg(long, default_value_t = false)] pub force: bool, - /// Number of worker threads - #[arg( - short = 'T', - long, - default_value_t = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1))] - pub threads: usize, + #[command(flatten)] + pub common: CommonArgs, } -enum PipelineData { - Path(PathBuf), - RawChunk(Rope), - NormChunk(Rope), - Batch(Vec), -} - -// SAFETY: Rope contains Cell which is !Sync, but pipeline ownership transfers -// exclusively through channels — no item is ever shared across threads. -unsafe impl Send for PipelineData {} -unsafe impl Sync for PipelineData {} - // ── Stage functions ─────────────────────────────────────────────────────────── -fn open_chunks(path: PathBuf) -> io::Result> { - let path_str = path - .to_str() - .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF-8 path"))?; - let iter = obiread::read_sequence_chunks(path_str)?; - Ok(iter.filter_map(|r| match r { - Ok(rope) => Some(rope), - Err(e) => { - eprintln!("chunk read error: {e}"); - None - } - })) -} - -fn normalize(rope: Rope, k: usize) -> io::Result { - obiread::normalize_sequence_chunk(rope, k) -} - -fn build_superkmers( - rope: Rope, - k: usize, - m: usize, - level_max: usize, - theta: f64, -) -> Vec { - SuperKmerIter::new(&rope, k, m, level_max, theta).collect() -} - fn write_batch(mut batch: Vec, kp: &Mutex) -> io::Result<()> { kp.lock() .unwrap() .write_batch(&mut batch) - .map_err(|e| io::Error::other(e)) + .map_err(io::Error::other) } // ── Entry point ─────────────────────────────────────────────────────────────── pub fn run(args: PartitionArgs) { - let k = args.kmer_size; - let m = args.minimizer_size; - let theta = args.theta; - let level_max = args.level_max; - let n_workers = args.threads.max(1); + let k = args.common.kmer_size; + let m = args.common.minimizer_size; + let theta = args.common.theta; + let level_max = args.common.level_max; + let n_workers = args.common.threads.max(1); - let kp = KmerPartition::create(&args.output, args.partition_bits, k, m, args.force) + let kp = KmerPartition::create(&args.output, args.common.partition_bits, k, m, args.force) .unwrap_or_else(|e| { eprintln!("error: {e}"); std::process::exit(1) @@ -119,16 +49,16 @@ pub fn run(args: PartitionArgs) { let kp = Arc::new(Mutex::new(kp)); let kp_sink = Arc::clone(&kp); - let paths = args.inputs.iter().map(PathBuf::from).collect(); + let paths = args.common.inputs.iter().map(PathBuf::from).collect(); let path_source = obiread::PathIter::new(paths); let pipeline = make_pipeline! { PipelineData, - source path_source => Path, - ||? open_chunks : Path => RawChunk, - |? { move |rope| normalize(rope, k) } : RawChunk => NormChunk, - | { move |rope| build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch, - sink? { move |batch| write_batch(batch, &kp_sink) } @ Batch, + source path_source => Path, + ||? open_chunks : Path => RawChunk, + |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, + | { move |rope| obiskbuilder::build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch, + sink? { move |batch| write_batch(batch, &kp_sink) } @ Batch, }; WorkerPool::new(pipeline, n_workers, 1).run(); diff --git a/src/obikmer/src/cmd/superkmer.rs b/src/obikmer/src/cmd/superkmer.rs index 39acfeb..88db465 100644 --- a/src/obikmer/src/cmd/superkmer.rs +++ b/src/obikmer/src/cmd/superkmer.rs @@ -4,94 +4,19 @@ use std::sync::{Arc, Mutex}; use clap::Args; use obifastwrite::write_scatter; -use obikrope::Rope; use obikseq::superkmer::SuperKmer; use obipipeline::{WorkerPool, make_pipeline}; -use obiskbuilder::SuperKmerIter; + +use crate::cli::{CommonArgs, PipelineData, open_chunks}; #[derive(Args)] pub struct SuperkmerArgs { - /// Input files or directories (FASTA/FASTQ, optionally gzip-compressed) - #[arg(num_args = 1..)] - pub inputs: Vec, - - /// k-mer size - #[arg(short, long, default_value_t = 31)] - pub kmer_size: usize, - - /// Minimizer size - #[arg(short, long, default_value_t = 11)] - pub minimizer_size: usize, - - /// Entropy threshold (k-mers with score ≤ theta are rejected) - #[arg(long, default_value_t = 0.7)] - pub theta: f64, - - /// Maximum sub-word size for entropy computation - #[arg(long, default_value_t = 6)] - pub level_max: usize, - - /// Number of bits to encode partitions (allows up to 2^partition_bits partitions) - #[arg(short, long, default_value_t = 8)] - pub partition_bits: usize, - - /// Number of worker threads - #[arg( - short = 'T', - long, - default_value_t = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1))] - pub threads: usize, + #[command(flatten)] + pub common: CommonArgs, } -enum PipelineData { - Path(PathBuf), - RawChunk(Rope), - NormChunk(Rope), - Batch(Vec), -} - -// SAFETY: Rope contains Cell which is !Sync, but pipeline ownership transfers -// exclusively through channels — no item is ever shared across threads. -unsafe impl Send for PipelineData {} -unsafe impl Sync for PipelineData {} - // ── Stage functions ─────────────────────────────────────────────────────────── -/// Opens a sequence file and returns an iterator over its raw Rope chunks. -/// Chunk-level I/O errors are logged and skipped. -fn open_chunks(path: PathBuf) -> io::Result> { - let path_str = path - .to_str() - .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF-8 path"))?; - let iter = obiread::read_sequence_chunks(path_str)?; - Ok(iter.filter_map(|r| match r { - Ok(rope) => Some(rope), - Err(e) => { - eprintln!("chunk read error: {e}"); - None - } - })) -} - -/// Normalises a raw sequence chunk (FASTA or FASTQ) into a compact ACGT/NUL rope. -fn normalize(rope: Rope, k: usize) -> io::Result { - obiread::normalize_sequence_chunk(rope, k) -} - -/// Extracts all super-kmers from a normalised rope. -fn build_superkmers( - rope: Rope, - k: usize, - m: usize, - level_max: usize, - theta: f64, -) -> Vec { - SuperKmerIter::new(&rope, k, m, level_max, theta).collect() -} - -/// Writes a batch of super-kmers to the output sink. fn write_batch( batch: Vec, out: &Mutex>, @@ -104,7 +29,7 @@ fn write_batch( for sk in batch { let minimizer = sk .kmer(sk.minimizer_pos() as usize, m) - .map_err(|e| std::io::Error::other(e))? + .map_err(io::Error::other)? .canonical(m); let partition = (minimizer.hash(m) & partition_mask) as usize; write_scatter(&sk, &mut *w, k, m, partition, minimizer)?; @@ -112,26 +37,17 @@ fn write_batch( Ok(()) } -#[inline] -fn mix64(x: u64) -> u64 { - let x = x ^ (x >> 30); - let x = x.wrapping_mul(0xbf58476d1ce4e5b9); - let x = x ^ (x >> 27); - let x = x.wrapping_mul(0x94d049bb133111eb); - x ^ (x >> 31) -} - // ── Entry point ─────────────────────────────────────────────────────────────── pub fn run(args: SuperkmerArgs) { - let k = args.kmer_size; - let m = args.minimizer_size; - let theta = args.theta; - let level_max = args.level_max; - let partition_bits = args.partition_bits; - let n_workers = args.threads.max(1); + let k = args.common.kmer_size; + let m = args.common.minimizer_size; + let theta = args.common.theta; + let level_max = args.common.level_max; + let partition_bits = args.common.partition_bits; + let n_workers = args.common.threads.max(1); - let paths = args.inputs.iter().map(PathBuf::from).collect(); + let paths = args.common.inputs.iter().map(PathBuf::from).collect(); let path_source = obiread::PathIter::new(paths); let out = Arc::new(Mutex::new(BufWriter::new(io::stdout()))); @@ -139,11 +55,11 @@ pub fn run(args: SuperkmerArgs) { let pipeline = make_pipeline! { PipelineData, - source path_source => Path, - ||? open_chunks : Path => RawChunk, - |? { move |rope| normalize(rope, k) } : RawChunk => NormChunk, - | { move |rope| build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch, - sink? { move |batch| write_batch(batch, &out_sink, partition_bits, k, m) } @ Batch, + source path_source => Path, + ||? open_chunks : Path => RawChunk, + |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, + | { move |rope| obiskbuilder::build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch, + sink? { move |batch| write_batch(batch, &out_sink, partition_bits, k, m) } @ Batch, }; WorkerPool::new(pipeline, n_workers, 1).run(); diff --git a/src/obikmer/src/main.rs b/src/obikmer/src/main.rs index a780237..c8f25e0 100644 --- a/src/obikmer/src/main.rs +++ b/src/obikmer/src/main.rs @@ -1,3 +1,4 @@ +mod cli; mod cmd; use clap::{Parser, Subcommand}; @@ -22,7 +23,7 @@ enum Commands { fn main() { fmt() - .with_env_filter(EnvFilter::from_default_env()) + .with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) .with_writer(std::io::stderr) .init(); diff --git a/src/obikpartitionner/src/partition.rs b/src/obikpartitionner/src/partition.rs index 40ef113..52783f4 100644 --- a/src/obikpartitionner/src/partition.rs +++ b/src/obikpartitionner/src/partition.rs @@ -20,13 +20,13 @@ use rayon::prelude::*; use serde::{Deserialize, Serialize}; const META_FILENAME: &str = "partition.meta"; +const SK_EXT: &str = "skmer.zst"; #[derive(Serialize, Deserialize)] struct PartitionMeta { n_bits: usize, kmer_size: usize, minimizer_size: usize, - format: String, level: u32, } @@ -37,7 +37,6 @@ pub struct KmerPartition { kmer_size: usize, minimizer_size: usize, writers: Vec>, - format: Format, level: Level, closed: bool, } @@ -50,15 +49,7 @@ impl KmerPartition { minimizer_size: usize, force: bool, ) -> SKResult { - Self::create_with( - path, - n_bits, - kmer_size, - minimizer_size, - Format::Zstd, - Level::Three, - force, - ) + Self::create_with(path, n_bits, kmer_size, minimizer_size, Level::Three, force) } pub fn create_with>( @@ -66,7 +57,6 @@ impl KmerPartition { n_bits: usize, kmer_size: usize, minimizer_size: usize, - format: Format, level: Level, force: bool, ) -> SKResult { @@ -95,7 +85,6 @@ impl KmerPartition { kmer_size, minimizer_size, writers, - format, level, closed: false, }; @@ -116,13 +105,6 @@ impl KmerPartition { let meta: PartitionMeta = serde_json::from_reader(fs::File::open(&meta_path)?) .map_err(io::Error::other)?; - let format = match meta.format.as_str() { - "gzip" => Format::Gzip, - "bzip2" => Format::Bzip, - "lzma" => Format::Lzma, - "zstd" => Format::Zstd, - _ => Format::No, - }; let level = level_from_u32(meta.level); let n_partitions = 1usize << meta.n_bits; let writers = (0..n_partitions).map(|_| None).collect(); @@ -133,7 +115,6 @@ impl KmerPartition { kmer_size: meta.kmer_size, minimizer_size: meta.minimizer_size, writers, - format, level, closed: true, // read-only: writing is not allowed on an opened partition }) @@ -203,9 +184,7 @@ impl KmerPartition { /// partition). Higher values reduce per-temp-file memory at the cost of /// more temporary file descriptors — all managed by the global fd pool. pub fn dereplicate(&self) -> SKResult<()> { - let format = self.format; let level = self.level; - let ext = format_ext(format); let root = &self.root_path; let available = System::new_all().available_memory(); let n_threads = rayon::current_num_threads().max(1) as u64; @@ -218,9 +197,9 @@ impl KmerPartition { if !dir.exists() { return Ok(()); } - let raw_path = dir.join(format!("raw.{ext}")); + let raw_path = dir.join(format!("raw.{SK_EXT}")); let n_buckets = optimal_buckets(&raw_path, available_per_thread); - dereplicate_partition(&dir, ext, format, level, n_buckets) + dereplicate_partition(&dir, level, n_buckets) }) .collect(); @@ -241,7 +220,6 @@ impl KmerPartition { /// Partitions are processed in parallel via Rayon (one task per thread). /// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously. pub fn count_kmer(&self) -> SKResult<()> { - let ext = format_ext(self.format); let root = &self.root_path; let k = self.kmer_size; @@ -249,7 +227,7 @@ impl KmerPartition { .into_par_iter() .map(|i| { let dir = root.join(format!("part_{:05}", i)); - let dedup_path = dir.join(format!("dereplicated.{ext}")); + let dedup_path = dir.join(format!("dereplicated.{SK_EXT}")); if !dedup_path.exists() { return Ok(()); } @@ -320,14 +298,6 @@ impl KmerPartition { n_bits, kmer_size: self.kmer_size, minimizer_size: self.minimizer_size, - format: match self.format { - Format::Gzip => "gzip", - Format::Bzip => "bzip2", - Format::Lzma => "lzma", - Format::Zstd => "zstd", - Format::No => "none", - } - .to_owned(), level: u32::from(self.level), }; let f = fs::File::create(self.root_path.join(META_FILENAME))?; @@ -339,9 +309,8 @@ impl KmerPartition { if self.writers[partition].is_none() { let dir = self.root_path.join(format!("part_{:05}", partition)); fs::create_dir_all(&dir)?; - let ext = format_ext(self.format); - let file_path = dir.join(format!("raw.{ext}")); - let writer = SKFileWriter::create_with(file_path, self.format, self.level)?; + let file_path = dir.join(format!("raw.{SK_EXT}")); + let writer = SKFileWriter::create_with(file_path, Format::Zstd, self.level)?; self.writers[partition] = Some(writer); } Ok(self.writers[partition].as_mut().unwrap()) @@ -408,34 +377,19 @@ fn level_from_u32(n: u32) -> Level { } } -fn format_ext(format: Format) -> &'static str { - match format { - Format::Gzip => "skmer.gz", - Format::Bzip => "skmer.bz2", - Format::Lzma => "skmer.xz", - Format::Zstd => "skmer.zst", - Format::No => "skmer", - } -} /// Maximum value that fits in the 24-bit COUNT field of a SuperKmer header. const MAX_SK_COUNT: u64 = (1 << 24) - 1; /// Deduplicate one partition directory in place (two-phase split + merge). -fn dereplicate_partition( - dir: &Path, - ext: &str, - format: Format, - level: Level, - n_temp: usize, -) -> SKResult<()> { - let raw_path = dir.join(format!("raw.{ext}")); +fn dereplicate_partition(dir: &Path, level: Level, n_temp: usize) -> SKResult<()> { + let raw_path = dir.join(format!("raw.{SK_EXT}")); if !raw_path.exists() { return Ok(()); } - let out_path = dir.join(format!("dereplicated.{ext}")); - let mut writer = SKFileWriter::create_with(&out_path, format, level)?; + let out_path = dir.join(format!("dereplicated.{SK_EXT}")); + let mut writer = SKFileWriter::create_with(&out_path, Format::Zstd, level)?; if n_temp == 1 { // ── Direct path: partition fits in memory, no split needed ──────────── @@ -446,13 +400,13 @@ fn dereplicate_partition( // ── Phase 1: split raw file into temp buckets ───────────────────────── let temp_mask = (n_temp as u64) - 1; let temp_paths: Vec = (0..n_temp) - .map(|j| dir.join(format!("temp_{j:04}.{ext}"))) + .map(|j| dir.join(format!("temp_{j:04}.{SK_EXT}"))) .collect(); { let mut writers: Vec = temp_paths .iter() - .map(|p| SKFileWriter::create_with(p, format, level)) + .map(|p| SKFileWriter::create_with(p, Format::Zstd, level)) .collect::>()?; let mut reader = SKFileReader::open(&raw_path)?; @@ -530,12 +484,8 @@ fn count_partition(dir: &Path, dedup_path: &Path, k: usize) -> SKResult<()> { let mut reader = SKFileReader::open(dedup_path)?; while let Some(sk) = reader.read()? { pass1_superkmers += 1; - let seql = sk.seql(); - if seql < k { - continue; - } - for pos in 0..=(seql - k) { - seen.insert(sk.kmer(pos, k).map_err(io::Error::other)?.canonical(k)); + for kmer in sk.iter_canonical_kmers(k) { + seen.insert(kmer); } } } @@ -583,8 +533,7 @@ fn count_partition(dir: &Path, dedup_path: &Path, k: usize) -> SKResult<()> { continue; } pass2_count_sum += sk_count as u64; - for pos in 0..=(seql - k) { - let kmer = sk.kmer(pos, k).map_err(io::Error::other)?.canonical(k); + for kmer in sk.iter_canonical_kmers(k) { if let Some(idx) = mphf.get(&kmer) { counts[idx as usize] = counts[idx as usize].saturating_add(sk_count); pass2_kmer_hits += 1; diff --git a/src/obikrope/src/cursor.rs b/src/obikrope/src/cursor.rs index 3d1719c..4ab40f7 100644 --- a/src/obikrope/src/cursor.rs +++ b/src/obikrope/src/cursor.rs @@ -27,7 +27,7 @@ //! ``` //! use obikrope::{Rope, RopeCursor}; //! -//! let mut rope = Rope::new(); +//! let mut rope = Rope::new(None); //! rope.push(b"ACGT".to_vec()); //! //! let cursor = rope.fw_cursor(); diff --git a/src/obikseq/src/superkmer.rs b/src/obikseq/src/superkmer.rs index 1175b9f..9a0332e 100644 --- a/src/obikseq/src/superkmer.rs +++ b/src/obikseq/src/superkmer.rs @@ -333,8 +333,6 @@ impl SuperKmer { /// Extract the canonical kmer of length k starting at nucleotide position i (0-based). /// - /// Equivalent to `self.kmer(i, k)?.canonical(k)` but avoids the redundant `revcomp` call - /// when the super-kmer is already in canonical form (which is the normal case). /// Returns an error if k is invalid (0 or > 32) or if position i + k exceeds the sequence length. pub fn canonical_kmer(&self, i: usize, k: usize) -> Result { Ok(self.kmer(i, k)?.canonical(k)) @@ -367,6 +365,16 @@ impl SuperKmer { true } + /// Iterate over all kmers of length `k` in order, yielding each as a left-aligned [`Kmer`]. + pub fn iter_kmers(&self, k: usize) -> impl Iterator + '_ { + SKKmerIter::new(self, k) + } + + /// Iterate over all canonical kmers of length `k` in order. + pub fn iter_canonical_kmers(&self, k: usize) -> impl Iterator + '_ { + self.iter_kmers(k).map(move |km| km.canonical(k)) + } + /// Returns the XXH3 hash of the super-kmer sequence. pub fn hash(&self) -> u64 { if self.is_canonical() { @@ -379,6 +387,53 @@ impl SuperKmer { } } +struct SKKmerIter<'a> { + skmer: &'a SuperKmer, + mask: u64, + lshift: usize, + current: u64, + pos: usize, + max_pos: usize, +} + +impl<'a> SKKmerIter<'a> { + fn new(skmer: &'a SuperKmer, k: usize) -> Self { + let seql = skmer.seql(); + let lshift = 64 - k * 2; + let mask = ((!0u128) << (lshift + 2)) as u64; + Self { + skmer, + mask, + lshift, + current: if seql >= k { skmer.kmer(0, k).unwrap().raw() } else { 0 }, + pos: k, + max_pos: seql, + } + } +} + +impl<'a> Iterator for SKKmerIter<'a> { + type Item = Kmer; + + fn next(&mut self) -> Option { + if self.pos > self.max_pos { + return None; + } + // Emit current kmer first, then slide the window forward. + let result = Kmer::from_raw(self.current); + if self.pos < self.max_pos { + let byte_pos = self.pos / 4; + // Nucleotide at position r within its byte occupies bits 7-2r (MSB) and 6-2r (LSB). + // Extract right-aligned, then place at lshift. + let inner_shift = 6 - 2 * (self.pos & 3); + let nuc = (((self.skmer.seq[byte_pos] >> inner_shift) & 3) as u64) << self.lshift; + self.current = ((self.current << 2) & self.mask) | nuc; + } + self.pos += 1; + Some(result) + } +} + // ── helpers ─────────────────────────────────────────────────────────────────── fn complement(base: u8) -> u8 { @@ -755,4 +810,121 @@ mod tests { sk.set_minimizer_pos(3); assert_eq!(sk.to_ascii(), ascii); } + + // ── iter_kmers ──────────────────────────────────────────────────────────── + + #[test] + fn iter_kmers_count() { + let ascii = b"ACGTACGTACGT"; + let sk = SuperKmer::from_ascii(ascii); + for k in [1usize, 3, 4, 5, 8, 12] { + let n = sk.iter_kmers(k).count(); + assert_eq!(n, ascii.len() - k + 1, "count mismatch for k={k}"); + } + } + + #[test] + fn iter_kmers_first_is_kmer_0() { + let ascii = b"ACGTACGT"; + let sk = SuperKmer::from_ascii(ascii); + for k in 1..=ascii.len() { + let first = sk.iter_kmers(k).next().unwrap(); + assert_eq!(first, sk.kmer(0, k).unwrap(), "k={k}"); + } + } + + #[test] + fn iter_kmers_matches_kmer_at_each_position() { + let ascii = b"ACGTACGTACGT"; + let sk = SuperKmer::from_ascii(ascii); + let k = 4; + let kmers: Vec = sk.iter_kmers(k).collect(); + assert_eq!(kmers.len(), ascii.len() - k + 1); + for (i, &km) in kmers.iter().enumerate() { + assert_eq!(km, sk.kmer(i, k).unwrap(), "mismatch at pos {i}"); + } + } + + #[test] + fn iter_kmers_single_when_seql_eq_k() { + let ascii = b"ACGTACGT"; + let sk = SuperKmer::from_ascii(ascii); + let k = ascii.len(); + let kmers: Vec = sk.iter_kmers(k).collect(); + assert_eq!(kmers.len(), 1); + assert_eq!(kmers[0], sk.kmer(0, k).unwrap()); + } + + #[test] + fn iter_kmers_two_when_seql_eq_k_plus_one() { + let ascii = b"ACGTACGT"; + let sk = SuperKmer::from_ascii(ascii); + let k = ascii.len() - 1; + let kmers: Vec = sk.iter_kmers(k).collect(); + assert_eq!(kmers.len(), 2); + assert_eq!(kmers[0], sk.kmer(0, k).unwrap()); + assert_eq!(kmers[1], sk.kmer(1, k).unwrap()); + } + + #[test] + fn iter_kmers_all_k_values() { + // For every valid k, each yielded kmer must match kmer(i, k). + let ascii = b"ACGTACGTACGT"; + let sk = SuperKmer::from_ascii(ascii); + let seql = ascii.len(); + for k in 1..=seql { + let kmers: Vec = sk.iter_kmers(k).collect(); + assert_eq!(kmers.len(), seql - k + 1, "k={k}"); + for (i, &km) in kmers.iter().enumerate() { + assert_eq!(km, sk.kmer(i, k).unwrap(), "k={k}, pos={i}"); + } + } + } + + #[test] + fn iter_kmers_crosses_byte_boundary() { + // Positions 3→4 and 7→8 cross a 4-nucleotide byte boundary. + let ascii = b"ACGTACGTACGT"; + let sk = SuperKmer::from_ascii(ascii); + let k = 3; + let kmers: Vec = sk.iter_kmers(k).collect(); + for boundary in [3usize, 4, 7, 8] { + if boundary + 1 < kmers.len() { + assert_eq!( + kmers[boundary], + sk.kmer(boundary, k).unwrap(), + "pos={boundary}" + ); + assert_eq!( + kmers[boundary + 1], + sk.kmer(boundary + 1, k).unwrap(), + "pos={}", + boundary + 1 + ); + } + } + } + + #[test] + fn iter_kmers_k1_yields_all_nucleotides() { + let ascii = b"ACGT"; + let sk = SuperKmer::from_ascii(ascii); + let kmers: Vec = sk.iter_kmers(1).collect(); + assert_eq!(kmers.len(), 4); + for (i, &km) in kmers.iter().enumerate() { + assert_eq!(km, sk.kmer(i, 1).unwrap(), "pos={i}"); + } + } + + #[test] + fn iter_kmers_long_sequence() { + let ascii = make_seq(20); + let sk = SuperKmer::from_ascii(&ascii); + let k = 7; + let kmers: Vec = sk.iter_kmers(k).collect(); + assert_eq!(kmers.len(), ascii.len() - k + 1); + for (i, &km) in kmers.iter().enumerate() { + assert_eq!(km, sk.kmer(i, k).unwrap(), "pos={i}"); + } + } } diff --git a/src/obiskbuilder/src/iter.rs b/src/obiskbuilder/src/iter.rs index 6feeb99..79980a0 100644 --- a/src/obiskbuilder/src/iter.rs +++ b/src/obiskbuilder/src/iter.rs @@ -168,39 +168,43 @@ mod tests { .collect() } + // k=11, m=5 — valeurs minimales du projet (k ∈ [11,31]) + const K: usize = 11; + const M: usize = 5; + #[test] fn single_segment_one_superkmer() { - let out = run_nofilter(b"ACGTACGT\x00", 4, 2); + let out = run_nofilter(b"ACGTACGTACGTACGTACGT\x00", K, M); assert!(!out.is_empty()); let total: Vec = out.into_iter().flatten().collect(); - assert!(total.len() >= 4); + assert!(total.len() >= K); } #[test] fn segment_shorter_than_k_emits_nothing() { - let out = run_nofilter(b"ACG\x00", 4, 2); + let out = run_nofilter(b"ACGTACGT\x00", K, M); assert_eq!(out, Vec::>::new()); } #[test] fn empty_input_emits_nothing() { - let out = run_nofilter(b"", 4, 2); + let out = run_nofilter(b"", K, M); assert_eq!(out, Vec::>::new()); } #[test] fn two_segments_both_emitted() { - let out = run_nofilter(b"ACGTACGT\x00TTTTTTTT\x00", 4, 2); + let out = run_nofilter(b"ACGTACGTACGTACGT\x00TGCATGCATGCATGCA\x00", K, M); assert!(!out.is_empty()); } #[test] fn low_complexity_kmer_is_rejected() { - let out_pass = run_nofilter(b"AAAAAAAAACGT\x00", 4, 2); + let out_pass = run_nofilter(b"AAAAAAAAAAAACGTACGTACGT\x00", K, M); assert!(!out_pass.is_empty()); - let rope = make_rope(b"AAAAAAAA\x00"); - let out_reject: Vec> = SuperKmerIter::new(&rope, 4, 2, 6, 0.9) + let rope = make_rope(b"AAAAAAAAAAAAAAAAAAAA\x00"); + let out_reject: Vec> = SuperKmerIter::new(&rope, K, M, 6, 0.9) .map(|sk| sk.to_ascii()) .collect(); assert!(out_reject.is_empty()); @@ -208,12 +212,12 @@ mod tests { #[test] fn multi_slice_rope() { - let data = b"ACGTACGTACGT\x00"; + let data = b"ACGTACGTACGTACGTACGT\x00"; let mid = data.len() / 2; let mut rope = Rope::new(None); rope.push(data[..mid].to_vec()); rope.push(data[mid..].to_vec()); - let out: Vec> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0) + let out: Vec> = SuperKmerIter::new(&rope, K, M, 1, 0.0) .map(|sk| sk.to_ascii()) .collect(); assert!(!out.is_empty()); @@ -221,8 +225,8 @@ mod tests { #[test] fn yields_minimizer_value() { - let rope = make_rope(b"ACGTACGT\x00"); - let results: Vec = SuperKmerIter::new(&rope, 4, 2, 1, 0.0).collect(); + let rope = make_rope(b"ACGTACGTACGTACGTACGT\x00"); + let results: Vec = SuperKmerIter::new(&rope, K, M, 1, 0.0).collect(); assert!(!results.is_empty()); } } diff --git a/src/obiskbuilder/src/lib.rs b/src/obiskbuilder/src/lib.rs index 88fa27b..756c704 100644 --- a/src/obiskbuilder/src/lib.rs +++ b/src/obiskbuilder/src/lib.rs @@ -14,3 +14,11 @@ pub(crate) mod rolling_stat; pub use iter::SuperKmerIter; pub use scratch::SuperKmerScratch; + +use obikrope::Rope; +use obikseq::superkmer::SuperKmer; + +/// Collect all super-kmers from a normalised rope chunk. +pub fn build_superkmers(rope: Rope, k: usize, m: usize, level_max: usize, theta: f64) -> Vec { + SuperKmerIter::new(&rope, k, m, level_max, theta).collect() +}