6 Commits

Author SHA1 Message Date
Eric Coissac 86b88acb95 feat: implement approximate k-mer indexing and optimize query
Enable approximate k-mer indexing via the `--approx` flag, computing an effective k-mer size of `k - z + 1` and configuring the appropriate indexing mode with validated probabilistic parameters. Refactor the Findere z-window filter in the query command to improve performance and correctness by replacing the precomputed vector with a lazy closure, optimizing cache locality, and fixing a variable naming bug.
2026-05-29 09:20:44 +02:00
Eric Coissac be0e8f1041 refactor(query): parallelize query execution with obipipeline
Extracts chunk processing into a dedicated function and introduces a QueryData enum with unsafe Send/Sync implementations to safely distribute Rope chunks across worker threads. Replaces nested iteration with a flat iterator and parallel block processing. Adds CLI argument parsing for presence, threshold, and detail flags to configure the pipeline.
2026-05-29 09:18:54 +02:00
Eric Coissac eaa52eaab5 feat: introduce nucstream abstraction and comprehensive test suite
Introduces a unified NucStream abstraction with NucPageCursor for byte-offset tracking and MIME-type dispatch to instantiate format-specific parsers. Exposes nuc_stream and open_nuc_stream APIs that return boxed, Send-compatible iterators. Additionally, adds a comprehensive test suite covering chunk boundary alignment, FASTA/FASTQ record parsing, sequence normalization, and edge cases such as CRLF line endings, @ in quality strings, and multi-slice rope processing.
2026-05-29 09:17:24 +02:00
Eric Coissac cfadf63bbc refactor: migrate pipeline to NucPage-based stream processing
Replace the existing chunk and Rope-based processing pipeline with a fixed-size NucPage architecture. Introduce a new nucstream module featuring buffer-pooled, in-place parsing that auto-detects and decompresses FASTA/FASTQ/GenBank inputs into normalized ACGT streams with k-mer overlap preservation. Update obikmer scatter and superkmer stages to consume NucPage iterators and cursor-based navigation, eliminating std::io::Read dependencies and optimizing memory management. Add a configurable max_open_files CLI argument and update implementation documentation to reflect the new record vs. stream reading paths.
2026-05-29 09:10:25 +02:00
Eric Coissac a4b57a96de feat: add streaming sequence reader and superkmer iterator
Introduce the `obiread` crate with a streaming byte normalizer that processes FASTA, FASTQ, and GenBank files using a 64 KiB ring buffer for O(1) memory usage. Integrate this crate into `obiskbuilder` to provide `SuperKmerStreamIter`, enabling memory-efficient superkmer traversal with rolling entropy and minimizer-based cut conditions.
2026-05-29 09:10:25 +02:00
Eric Coissac 0d9be53d1f feat: enforce runtime validation for kmer and minimizer parameters
Introduces `CommonArgs::validate()` to enforce strict constraints on `--kmer-size` (odd, 11–31), `--minimizer-size` (odd, 3–k−1), and `z` (strictly less than k). This validation is applied at the entry point of the `superkmer` and `index` commands to prevent invalid configurations, avoid palindromes, prevent u64 overflow, and ensure positive effective indexing sizes. Documentation is updated to reflect these runtime checks and immediate termination on invalid input.
2026-05-29 09:10:25 +02:00
27 changed files with 1980 additions and 686 deletions
+19 -1
View File
@@ -1,6 +1,24 @@
# Chunk reader — implementation # Chunk reader — implementation
The `obiread` crate provides a streaming iterator that reads FASTA or FASTQ files in fixed-size blocks and yields self-contained chunks, each ending on a complete sequence record boundary. Chunks are consumed in parallel by downstream workers. `obiread` exposes two distinct sequence reading paths, each optimised for a different use case.
## Two reading paths
| Path | API | Output unit | Per-record identity | Use case |
|------|-----|-------------|---------------------|----------|
| **Record path** | `read_sequence_chunks``parse_chunk` | `SeqRecord` (id + raw sequence + normalised rope) | yes | `query` — must read complete records |
| **Stream path** | `open_nuc_stream` | `NucPage` (flat normalised byte buffer) | no | `index`, `superkmer` — bulk throughput |
The record path uses `Rope`-backed chunks and is described in detail below.
The stream path (`NucStream` / `NucPage`) is described in the scatter section of [pipeline](pipeline.md).
---
## Record path: chunk reader
The chunk reader reads FASTA or FASTQ files in fixed-size blocks and yields self-contained chunks, each ending on a complete sequence record boundary. `parse_chunk` then converts each chunk into a `Vec<SeqRecord>`, where each record carries its identifier, raw sequence bytes, and a normalised rope ready for superkmer building.
This path is mandatory for `query`, where superkmers must be tracked back to their originating sequence (id, kmer offset) for output annotation.
## Output type: Rope ## Output type: Rope
+5 -1
View File
@@ -19,7 +19,11 @@ The histogram gives:
## Phase 1 — Scatter ## Phase 1 — Scatter
Single streaming pass over raw input files (FASTA/FASTQ, gzip). FASTQ quality scores are ignored. For each read: Single streaming pass over raw input files (FASTA/FASTQ, gzip). FASTQ quality scores are ignored.
Input files are read via `open_nuc_stream`, which opens and decompresses the file, auto-detects the format (FASTA / FASTQ / GenBank), and yields a sequence of `NucPage` buffers. Each `NucPage` is a flat 64 KB buffer of normalised bytes (`ACGT` + `\x00` separators), carrying a k1 byte overlap from the preceding page so that no k-mer is lost at page boundaries. Per-record identity (sequence id, raw bytes) is not preserved; this is intentional — the scatter phase only needs normalised bases to produce superkmers.
For each read fragment within a page:
1. **Ambiguous base filter**: cut at any non-ACGT base; discard fragments shorter than k. 1. **Ambiguous base filter**: cut at any non-ACGT base; discard fragments shorter than k.
2. **Entropy filter**: scan each fragment with a sliding window of size k. When the kmer $K_i = S[i \mathinner{..} i+k-1]$ ended by nucleotide $S[j]$ (with $j = i+k-1$) has entropy below threshold $\theta$, emit the current segment and start a new one (see algorithm below). $K_i$ belongs to neither segment, and no valid kmer is lost. 2. **Entropy filter**: scan each fragment with a sliding window of size k. When the kmer $K_i = S[i \mathinner{..} i+k-1]$ ended by nucleotide $S[j]$ (with $j = i+k-1$) has entropy below threshold $\theta$, emit the current segment and start a new one (see algorithm below). $K_i$ belongs to neither segment, and no valid kmer is lost.
+12
View File
@@ -27,6 +27,18 @@
- Canonical form: `min(kmer, revcomp(kmer))` reduces strand-symmetric space by half - Canonical form: `min(kmer, revcomp(kmer))` reduces strand-symmetric space by half
- Input formats: FASTA, FASTQ, gzip, streaming stdin; `index` reads from stdin automatically when no input files are provided (`-` can also be passed explicitly among other paths) - Input formats: FASTA, FASTQ, gzip, streaming stdin; `index` reads from stdin automatically when no input files are provided (`-` can also be passed explicitly among other paths)
## Parameter constraints (enforced at CLI)
All constraints below are checked by `CommonArgs::validate()` at the start of `superkmer` and `index`. Invalid values exit immediately with an error.
| Parameter | Constraint | Reason |
|-----------|-----------|--------|
| k (`--kmer-size`) | odd | even k allows palindromic k-mers: kmer == revcomp(kmer), breaking the canonical form invariant |
| k (`--kmer-size`) | k ∈ [11, 31] | k > 31 overflows u64 at 2 bits/base; k < 11 gives insufficient specificity |
| m (`--minimizer-size`) | odd | same palindrome argument as k |
| m (`--minimizer-size`) | 3 ≤ m ≤ k1 | minimizer must be strictly shorter than the kmer |
| z (`-z`, Findere, `index --approx` only) | z ≤ k1 | effective indexed kmer size is kz+1; z ≥ k would make it ≤ 0 |
## Genome label constraints ## Genome label constraints
Genome labels are arbitrary Unicode strings with the following restrictions: Genome labels are arbitrary Unicode strings with the following restrictions:
+3 -1
View File
@@ -4,9 +4,11 @@
A **kmer** is a DNA subsequence of fixed length k. Two constraints govern the choice of k: A **kmer** is a DNA subsequence of fixed length k. Two constraints govern the choice of k:
- **k ∈ [11, 31]**: the range ensures the kmer is long enough to be specific and short enough to fit in a single machine word. - **k ∈ [11, 31]**: the range ensures the kmer is long enough to be specific and short enough to fit in a single machine word (u64 at 2 bits/base requires k ≤ 32; k < 11 yields insufficient specificity).
- **k is odd**: an odd-length sequence cannot equal its own reverse complement (no palindromes). This guarantees that the canonical form `min(kmer, revcomp(kmer))` is always strictly defined — the two orientations are always distinct — which is required for strand-independent counting. - **k is odd**: an odd-length sequence cannot equal its own reverse complement (no palindromes). This guarantees that the canonical form `min(kmer, revcomp(kmer))` is always strictly defined — the two orientations are always distinct — which is required for strand-independent counting.
Both constraints are **enforced at CLI entry** by `CommonArgs::validate()` in `superkmer` and `index`. Passing an invalid k exits immediately with an error message.
## Super-kmers ## Super-kmers
A **super-kmer** is a maximal run of consecutive kmers from a DNA read, each overlapping the next by k1 nucleotides, sharing the same **canonical minimizer**. The **canonical minimizer** of a kmer is the m-mer (m < k) whose canonical hash `hash_kmer(min(m-mer, revcomp(m-mer)))` is smallest over all m-mers in the kmer window. The hash function is a `mix64`-based bijection; selection is purely hash-ordered with no degeneracy filter. A super-kmer is capped at 256 nucleotides; a longer run is split at that boundary. A **super-kmer** is a maximal run of consecutive kmers from a DNA read, each overlapping the next by k1 nucleotides, sharing the same **canonical minimizer**. The **canonical minimizer** of a kmer is the m-mer (m < k) whose canonical hash `hash_kmer(min(m-mer, revcomp(m-mer)))` is smallest over all m-mers in the kmer window. The hash function is a `mix64`-based bijection; selection is purely hash-ordered with no degeneracy filter. A super-kmer is capped at 256 nucleotides; a longer run is split at that boundary.
+1
View File
@@ -1637,6 +1637,7 @@ dependencies = [
"lazy_static", "lazy_static",
"obikrope", "obikrope",
"obikseq", "obikseq",
"obiread",
] ]
[[package]] [[package]]
+36 -24
View File
@@ -1,9 +1,8 @@
use std::io;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Condvar, Mutex}; use std::sync::{Arc, Condvar, Mutex};
use clap::Args; use clap::Args;
use obikrope::Rope; use obiread::NucPage;
use obikseq::RoutableSuperKmer; use obikseq::RoutableSuperKmer;
// ── Shared arguments ────────────────────────────────────────────────────────── // ── Shared arguments ──────────────────────────────────────────────────────────
@@ -45,9 +44,11 @@ pub struct CommonArgs {
)] )]
pub threads: usize, pub threads: usize,
/// Maximum number of input files open simultaneously /// Maximum number of input files open simultaneously.
#[arg(long, default_value_t = 20)] /// Defaults to threads/4 (minimum 1). Keep below the number of workers
pub max_open_files: usize, /// to ensure CPU workers are always available for the transform stage.
#[arg(long)]
pub max_open_files: Option<usize>,
} }
/// Smallest `b` such that `2^b >= n` (i.e. `n.next_power_of_two().ilog2()`). /// Smallest `b` such that `2^b >= n` (i.e. `n.next_power_of_two().ilog2()`).
@@ -63,6 +64,35 @@ pub fn block_size_to_bits(n: usize) -> u8 {
} }
impl CommonArgs { impl CommonArgs {
/// Validate k and m constraints. Exits on error.
pub fn validate(&self) {
let k = self.kmer_size;
let m = self.minimizer_size;
if k < 11 || k > 31 {
eprintln!("error: --kmer-size must be in [11, 31] (got {k})");
std::process::exit(1);
}
if k % 2 == 0 {
eprintln!("error: --kmer-size must be odd (got {k}); even k allows palindromic k-mers");
std::process::exit(1);
}
if m < 3 || m >= k {
eprintln!("error: --minimizer-size must be in [3, k1] = [3, {}] (got {m})", k - 1);
std::process::exit(1);
}
if m % 2 == 0 {
eprintln!("error: --minimizer-size must be odd (got {m})");
std::process::exit(1);
}
}
pub fn effective_max_open(&self) -> usize {
self.max_open_files
.unwrap_or_else(|| (self.threads / 4).max(1))
.max(1)
}
pub fn seqfile_paths(&self) -> obiread::PathIter { pub fn seqfile_paths(&self) -> obiread::PathIter {
let paths: Vec<PathBuf> = if self.inputs.is_empty() { let paths: Vec<PathBuf> = if self.inputs.is_empty() {
vec![PathBuf::from("-")] vec![PathBuf::from("-")]
@@ -121,13 +151,10 @@ pub struct PathWithSlot {
pub enum PipelineData { pub enum PipelineData {
Path(PathWithSlot), Path(PathWithSlot),
RawChunk(Rope), NucPage(NucPage),
NormChunk(Rope),
Batch(Vec<RoutableSuperKmer>), Batch(Vec<RoutableSuperKmer>),
} }
// SAFETY: Rope contains Cell<u8> which is !Sync, but pipeline ownership transfers
// exclusively through channels — no item is ever shared across threads.
unsafe impl Send for PipelineData {} unsafe impl Send for PipelineData {}
unsafe impl Sync for PipelineData {} unsafe impl Sync for PipelineData {}
@@ -148,18 +175,3 @@ pub fn throttle_paths(
}) })
} }
// ── I/O plumbing ──────────────────────────────────────────────────────────────
pub fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
let path_str = path
.to_str()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF-8 path"))?;
let iter = obiread::read_sequence_chunks(path_str)?;
Ok(iter.filter_map(|r| match r {
Ok(rope) => Some(rope),
Err(e) => {
eprintln!("chunk read error: {e}");
None
}
}))
}
+18 -6
View File
@@ -152,16 +152,28 @@ pub(crate) fn resolve_approx_params(
} }
pub fn run(args: IndexArgs) { pub fn run(args: IndexArgs) {
args.common.validate();
let output = args.output.clone(); let output = args.output.clone();
let mut rep = Reporter::new(); let mut rep = Reporter::new();
// ── Resolve evidence kind ──────────────────────────────────────────────── // ── Resolve evidence kind ────────────────────────────────────────────────
let evidence = if args.approx { let (evidence, effective_kmer_size) = if args.approx {
let (z, b, fp) = resolve_approx_params(args.findere_z, args.evidence_bits, args.fp); let (z, b, fp) = resolve_approx_params(args.findere_z, args.evidence_bits, args.fp);
info!("approximate evidence: b={b}, z={z}, fp={fp:.2e}"); let k = args.common.kmer_size;
IndexMode::Approx { b, z } if z as usize >= k {
eprintln!(
"error: Findere z={z} must be < kmer-size={k} \
(effective kmer size kz+1 = {} ≤ 0)",
k as isize - z as isize + 1
);
std::process::exit(1);
}
let s = k - z as usize + 1;
info!("approximate evidence: b={b}, z={z}, fp={fp:.2e}, indexed kmer size={s}");
(IndexMode::Approx { b, z }, s)
} else { } else {
IndexMode::Exact (IndexMode::Exact, args.common.kmer_size)
}; };
// ── Open or create the index ───────────────────────────────────────────── // ── Open or create the index ─────────────────────────────────────────────
@@ -186,7 +198,7 @@ pub fn run(args: IndexArgs) {
} }
let block_bits = block_size_to_bits(args.block_size); let block_bits = block_size_to_bits(args.block_size);
let config = IndexConfig { let config = IndexConfig {
kmer_size: args.common.kmer_size, kmer_size: effective_kmer_size,
minimizer_size: args.common.minimizer_size, minimizer_size: args.common.minimizer_size,
n_bits, n_bits,
with_counts: args.with_counts, with_counts: args.with_counts,
@@ -220,7 +232,7 @@ pub fn run(args: IndexArgs) {
let theta = args.common.theta; let theta = args.common.theta;
let n_workers = args.common.threads.max(1); let n_workers = args.common.threads.max(1);
let max_open = args.common.max_open_files.max(1); let max_open = args.common.effective_max_open();
scatter(idx.partition_mut(), args.common.seqfile_paths(), k, level_max, theta, n_workers, max_open, &mut rep); scatter(idx.partition_mut(), args.common.seqfile_paths(), k, level_max, theta, n_workers, max_open, &mut rep);
idx.mark_scattered().unwrap_or_else(|e| { idx.mark_scattered().unwrap_or_else(|e| {
+173 -135
View File
@@ -1,16 +1,30 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{self, BufWriter, Write}; use std::io::{self, BufWriter, Write};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use clap::Args; use clap::Args;
use obikindex::KmerIndex; use obikindex::KmerIndex;
use obilayeredmap::IndexMode; use obilayeredmap::IndexMode;
use obiread::record::{SeqRecord, parse_chunk};
use obiread::chunk::read_sequence_chunks; use obiread::chunk::read_sequence_chunks;
use obiread::record::{SeqRecord, parse_chunk};
use obikrope::Rope;
use obikseq::{RoutableSuperKmer, set_k, set_m}; use obikseq::{RoutableSuperKmer, set_k, set_m};
use obiskbuilder::SuperKmerIter; use obiskbuilder::SuperKmerIter;
use tracing::info; use tracing::info;
// ── Pipeline data ─────────────────────────────────────────────────────────────
enum QueryData {
Chunk(Rope),
Output(Vec<u8>),
}
// SAFETY: Rope contains Cell<u8> which is !Sync, but pipeline items are owned
// exclusively through channels — no item is ever shared across threads.
unsafe impl Send for QueryData {}
unsafe impl Sync for QueryData {}
// ── CLI ─────────────────────────────────────────────────────────────────────── // ── CLI ───────────────────────────────────────────────────────────────────────
#[derive(Args)] #[derive(Args)]
@@ -146,72 +160,153 @@ impl SeqAcc {
// ── Findere z-window filter ─────────────────────────────────────────────────── // ── Findere z-window filter ───────────────────────────────────────────────────
/// Apply the Findere z-window filter to per-kmer query results for one superkmer. /// Apply the Findere z-window filter to per-kmer query results for one superkmer.
/// Aggregate s-mer query results into k-mer answers using a Findere z-window.
/// ///
/// A k-mer at position i for genome g is confirmed only if it belongs to at least /// Input: N s-mer results (indexed kmer size s = k z + 1).
/// one run of z consecutive positions where all k-mers are present for g. /// Output: N z + 1 k-mer results (user kmer size k).
/// Unconfirmed positions are zeroed; positions whose entire row becomes zero are
/// returned as `None`.
/// ///
/// When z <= 1 or the superkmer is shorter than z k-mers, results are returned /// For each genome g independently: k-mer at position i is confirmed iff all z values
/// unchanged (short superkmers cannot satisfy the z-window constraint). /// results[i..i+z][g] are nonzero (None counts as zero for all genomes).
/// Output values are taken from results[i]; genomes not confirmed are zeroed.
fn apply_findere( fn apply_findere(
results: &[Option<Box<[u32]>>], results: &[Option<Box<[u32]>>],
z: usize, z: usize,
n_genomes: usize, n_genomes: usize,
) -> Vec<Option<Box<[u32]>>> { ) -> Vec<Option<Box<[u32]>>> {
let n = results.len(); let n = results.len();
if z <= 1 || n < z { if z <= 1 {
return results.iter().map(|r| r.as_ref().map(|row| row.clone())).collect(); return results.iter().map(|r| r.as_ref().map(|row| row.clone())).collect();
} }
if n < z {
return Vec::new();
}
let mut confirmed = vec![vec![false; n_genomes]; n]; let out_n = n - z + 1;
let mut confirmed = vec![vec![false; n_genomes]; out_n];
for g in 0..n_genomes { for g in 0..n_genomes {
let present: Vec<bool> = results let hit = |i: usize| results[i].as_ref().map_or(false, |r| r[g] > 0);
.iter()
.map(|r| r.as_ref().map_or(false, |row| row[g] > 0))
.collect();
let mut window_count = present[..z].iter().filter(|&&p| p).count(); let mut count: u32 = (0..z).filter(|&j| hit(j)).count() as u32;
if window_count == z { if count == z as u32 { confirmed[0][g] = true; }
for c in confirmed[..z].iter_mut() {
c[g] = true; for i in 1..out_n {
} if hit(i - 1) { count -= 1; }
if hit(i + z - 1) { count += 1; }
if count == z as u32 { confirmed[i][g] = true; }
}
}
(0..out_n).map(|i| {
let first = results[i].as_ref()?;
let mut row: Box<[u32]> = first.clone();
for g in 0..n_genomes {
if !confirmed[i][g] { row[g] = 0; }
}
if row.iter().any(|&v| v > 0) { Some(row) } else { None }
}).collect()
}
// ── process_chunk ─────────────────────────────────────────────────────────────
fn process_chunk(
idx: &KmerIndex,
rope: Rope,
k: usize,
n_genomes: usize,
n_partitions: usize,
with_counts: bool,
effective_z: usize,
detail: bool,
count_missing: bool,
force_presence: bool,
presence_threshold: u32,
) -> Vec<u8> {
let records = parse_chunk(&rope, k);
if records.is_empty() {
return Vec::new();
}
let batch = QueryBatch::from_records(records, k, 6, 0.7);
let n_seqs = batch.ids.len();
let mut accs: Vec<SeqAcc> =
(0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect();
let mut cov: Vec<Vec<Vec<u32>>> = if detail {
batch.n_kmers.iter()
.map(|&n| vec![vec![0u32; n as usize]; n_genomes])
.collect()
} else {
Vec::new()
};
let by_part = batch.split_by_partition(n_partitions);
for (part_idx, part_sks) in by_part.iter().enumerate() {
if part_sks.is_empty() {
continue;
} }
for j in 1..=(n - z) { let kmer_results = idx
if present[j - 1] { window_count -= 1; } .partition()
if present[j + z - 1] { window_count += 1; } .query_partition(part_idx, part_sks, k, n_genomes, with_counts)
if window_count == z { .unwrap_or_else(|e| {
for c in confirmed[j..j + z].iter_mut() { eprintln!("query error on partition {part_idx}: {e}");
c[g] = true; std::process::exit(1);
});
let presence = force_presence || !with_counts;
let threshold = presence_threshold;
for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) {
let filtered = apply_findere(sk_kmer_results, effective_z, n_genomes);
let descs = batch.map.get(*rsk).expect("rsk must be in map");
for desc in descs {
let acc = &mut accs[desc.seq_idx as usize];
for (local_pos, hit) in filtered.iter().enumerate() {
match hit {
None => {
if sk_kmer_results[local_pos].is_none() {
acc.kmer_missing += 1;
}
}
Some(row) => {
acc.kmer_count += 1;
for (g, &v) in row.iter().enumerate() {
if v == 0 { continue; }
let contribution = if presence {
u32::from(v >= threshold)
} else {
v
};
acc.genome_totals[g] += contribution;
if detail {
let abs_pos = desc.kmer_offset as usize + local_pos;
cov[desc.seq_idx as usize][g][abs_pos] += contribution;
}
}
}
}
} }
} }
} }
} }
results.iter().enumerate().map(|(i, opt)| { let mut buf = Vec::new();
let row = opt.as_ref()?; emit_batch(&batch, &accs, idx.meta(), count_missing, detail, &cov, &mut buf);
let mut new_row: Box<[u32]> = row.clone(); buf
let mut any = false;
for g in 0..n_genomes {
if !confirmed[i][g] {
new_row[g] = 0;
} else {
any = true;
}
}
if any { Some(new_row) } else { None }
}).collect()
} }
// ── Entry point ─────────────────────────────────────────────────────────────── // ── Entry point ───────────────────────────────────────────────────────────────
pub fn run(args: QueryArgs) { pub fn run(args: QueryArgs) {
let idx = KmerIndex::open(&args.index).unwrap_or_else(|e| { let idx = Arc::new(KmerIndex::open(&args.index).unwrap_or_else(|e| {
eprintln!("error opening index: {e}"); eprintln!("error opening index: {e}");
std::process::exit(1); std::process::exit(1);
}); }));
set_k(idx.kmer_size()); set_k(idx.kmer_size());
set_m(idx.minimizer_size()); set_m(idx.minimizer_size());
@@ -220,6 +315,7 @@ pub fn run(args: QueryArgs) {
let n_genomes = idx.meta().genomes.len(); let n_genomes = idx.meta().genomes.len();
let n_partitions = idx.n_partitions(); let n_partitions = idx.n_partitions();
let with_counts = idx.meta().config.with_counts; let with_counts = idx.meta().config.with_counts;
let n_workers = args.threads.max(1);
let effective_z: usize = args.findere_z.unwrap_or_else(|| { let effective_z: usize = args.findere_z.unwrap_or_else(|| {
match idx.meta().config.evidence { match idx.meta().config.evidence {
@@ -238,106 +334,48 @@ pub fn run(args: QueryArgs) {
eprintln!("warning: --mismatch not yet implemented, ignored"); eprintln!("warning: --mismatch not yet implemented, ignored");
} }
let detail = args.detail;
let count_missing = args.count_missing;
let force_presence = args.force_presence;
let presence_threshold = args.presence_threshold;
// Flat iterator over all Rope chunks from all input files.
// I/O runs in the source thread; chunk processing is parallelised by the pipe.
let paths: Vec<PathBuf> = args.inputs.iter().map(PathBuf::from).collect(); let paths: Vec<PathBuf> = args.inputs.iter().map(PathBuf::from).collect();
let all_chunks = paths.into_iter().flat_map(|path| {
let path_str = path.to_str().unwrap_or("").to_owned();
match read_sequence_chunks(&path_str) {
Ok(iter) => Box::new(iter.filter_map(|r| match r {
Ok(rope) => Some(rope),
Err(e) => { eprintln!("read error: {e}"); None }
})) as Box<dyn Iterator<Item = Rope> + Send>,
Err(e) => {
eprintln!("error opening {path_str}: {e}");
std::process::exit(1);
}
}
});
let pipe = obipipeline::make_pipe! {
QueryData : Rope => Vec<u8>,
| {
let idx = Arc::clone(&idx);
move |rope: Rope| {
process_chunk(
&idx, rope, k, n_genomes, n_partitions, with_counts, effective_z,
detail, count_missing, force_presence, presence_threshold,
)
}
} : Chunk => Output,
};
let mut out = BufWriter::new(io::stdout()); let mut out = BufWriter::new(io::stdout());
for block in pipe.apply(all_chunks, n_workers, 2) {
for path in &paths { if !block.is_empty() {
let chunks = read_sequence_chunks(path.to_str().unwrap_or("")) out.write_all(&block).expect("write error");
.unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", path.display());
std::process::exit(1);
});
for chunk_result in chunks {
let chunk = chunk_result.unwrap_or_else(|e| {
eprintln!("read error: {e}");
std::process::exit(1);
});
let records = parse_chunk(&chunk, k);
if records.is_empty() {
continue;
}
let batch = QueryBatch::from_records(records, k, 6, 0.7);
let n_seqs = batch.ids.len();
let mut accs: Vec<SeqAcc> =
(0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect();
// [seq_idx][genome_idx][kmer_position] — allocated only with --detail
let mut cov: Vec<Vec<Vec<u32>>> = if args.detail {
batch.n_kmers.iter()
.map(|&n| vec![vec![0u32; n as usize]; n_genomes])
.collect()
} else {
Vec::new()
};
let by_part = batch.split_by_partition(n_partitions);
for (part_idx, part_sks) in by_part.iter().enumerate() {
if part_sks.is_empty() {
continue;
}
let kmer_results = idx
.partition()
.query_partition(part_idx, part_sks, k, n_genomes, with_counts)
.unwrap_or_else(|e| {
eprintln!("query error on partition {part_idx}: {e}");
std::process::exit(1);
});
let presence = args.force_presence || !with_counts;
let threshold = args.presence_threshold;
for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) {
let filtered = apply_findere(sk_kmer_results, effective_z, n_genomes);
let descs = batch.map.get(*rsk).expect("rsk must be in map");
for desc in descs {
let acc = &mut accs[desc.seq_idx as usize];
for (local_pos, hit) in filtered.iter().enumerate() {
match hit {
None => {
// Only truly missing if the index also had no entry.
if sk_kmer_results[local_pos].is_none() {
acc.kmer_missing += 1;
}
}
Some(row) => {
acc.kmer_count += 1;
for (g, &v) in row.iter().enumerate() {
if v == 0 {
continue;
}
let contribution = if presence {
u32::from(v >= threshold)
} else {
v
};
acc.genome_totals[g] += contribution;
if args.detail {
let abs_pos = desc.kmer_offset as usize + local_pos;
cov[desc.seq_idx as usize][g][abs_pos] += contribution;
}
}
}
}
}
}
}
}
emit_batch(
&batch, &accs, idx.meta(),
args.count_missing, args.detail, &cov,
&mut out,
);
} }
} }
out.flush().expect("flush error");
} }
// ── Output ──────────────────────────────────────────────────────────────────── // ── Output ────────────────────────────────────────────────────────────────────
+12 -5
View File
@@ -4,7 +4,7 @@ use clap::Args;
use obifastwrite::write_scatter; use obifastwrite::write_scatter;
use obikseq::{RoutableSuperKmer, set_k, set_m}; use obikseq::{RoutableSuperKmer, set_k, set_m};
use crate::cli::{CommonArgs, PipelineData, PathWithSlot, open_chunks, partitions_to_bits, throttle_paths}; use crate::cli::{CommonArgs, PipelineData, PathWithSlot, partitions_to_bits, throttle_paths};
#[derive(Args)] #[derive(Args)]
pub struct SuperkmerArgs { pub struct SuperkmerArgs {
@@ -33,13 +33,15 @@ fn write_batch(
// ── Entry point ─────────────────────────────────────────────────────────────── // ── Entry point ───────────────────────────────────────────────────────────────
pub fn run(args: SuperkmerArgs) { pub fn run(args: SuperkmerArgs) {
args.common.validate();
let k = args.common.kmer_size; let k = args.common.kmer_size;
let m = args.common.minimizer_size; let m = args.common.minimizer_size;
let theta = args.common.theta; let theta = args.common.theta;
let level_max = args.common.level_max; let level_max = args.common.level_max;
let partition_bits = partitions_to_bits(args.common.partitions); let partition_bits = partitions_to_bits(args.common.partitions);
let n_workers = args.common.threads.max(1); let n_workers = args.common.threads.max(1);
let max_open = args.common.max_open_files.max(1); let max_open = args.common.effective_max_open();
set_k(k); set_k(k);
set_m(m); set_m(m);
@@ -48,9 +50,14 @@ pub fn run(args: SuperkmerArgs) {
let pipe = obipipeline::make_pipe! { let pipe = obipipeline::make_pipe! {
PipelineData : PathWithSlot => Vec<RoutableSuperKmer>, PipelineData : PathWithSlot => Vec<RoutableSuperKmer>,
||? { |pw: PathWithSlot| open_chunks(pw.path) } : Path => RawChunk, ||? {
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, let k = k;
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) } : NormChunk => Batch, move |pw: PathWithSlot| {
let path_str = pw.path.to_str().unwrap_or("").to_owned();
obiread::open_nuc_stream(&path_str, k)
}
} : Path => NucPage,
| { move |page| obiskbuilder::build_superkmers_page(page, k, level_max, theta) } : NucPage => Batch,
}; };
let mut out = BufWriter::new(io::stdout()); let mut out = BufWriter::new(io::stdout());
+39 -17
View File
@@ -1,30 +1,37 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
use obiread::NucPage;
use obikpartitionner::KmerPartition; use obikpartitionner::KmerPartition;
use obikrope::Rope;
use obisys::{Reporter, Stage}; use obisys::{Reporter, Stage};
use tracing::info; use tracing::info;
use crate::cli::{PipelineData, PathWithSlot, open_chunks, throttle_paths}; use crate::cli::{PipelineData, PathWithSlot, throttle_paths};
// ── Iterator that keeps the slot guard alive until the file is exhausted ────── // ── Iterator that keeps the slot guard alive until the file is exhausted ──────
struct GuardedIter<I> { struct GuardedIter {
inner: I, inner: Box<dyn Iterator<Item = NucPage> + Send>,
_guard: Box<dyn Send + 'static>, _guard: Box<dyn Send + 'static>,
flat_active: Arc<AtomicU32>,
} }
impl<I: Iterator<Item = Rope>> Iterator for GuardedIter<I> { impl Iterator for GuardedIter {
type Item = Rope; type Item = NucPage;
fn next(&mut self) -> Option<Rope> { fn next(&mut self) -> Option<NucPage> {
self.inner.next() self.inner.next()
} }
} }
impl Drop for GuardedIter {
fn drop(&mut self) {
self.flat_active.fetch_sub(1, Ordering::Relaxed);
}
}
// ── scatter ─────────────────────────────────────────────────────────────────── // ── scatter ───────────────────────────────────────────────────────────────────
/// Run scatter: normalise → build superkmers → route to partition → close. /// Run scatter: normalise → build superkmers → route to partition → close.
@@ -44,23 +51,36 @@ pub fn scatter(
// Throttle in the source thread — never in a worker — to prevent deadlock. // Throttle in the source thread — never in a worker — to prevent deadlock.
let throttled = throttle_paths(path_source, max_open); let throttled = throttle_paths(path_source, max_open);
let file_count = Arc::new(AtomicU64::new(0)); let file_count = Arc::new(AtomicU64::new(0));
let flat_active = Arc::new(AtomicU32::new(0));
let transform_active = Arc::new(AtomicU32::new(0));
let t = Stage::start("scatter"); let t = Stage::start("scatter");
let pipe = obipipeline::make_pipe! { let pipe = obipipeline::make_pipe! {
PipelineData : PathWithSlot => Vec<RoutableSuperKmer>, PipelineData : PathWithSlot => Vec<RoutableSuperKmer>,
||? { ||? {
let file_count = Arc::clone(&file_count); let file_count = Arc::clone(&file_count);
let flat_active = Arc::clone(&flat_active);
let k = k;
move |pw: PathWithSlot| { move |pw: PathWithSlot| {
let PathWithSlot { path, _guard } = pw; let PathWithSlot { path, _guard } = pw;
let n = file_count.fetch_add(1, Ordering::Relaxed) + 1; let n = file_count.fetch_add(1, Ordering::Relaxed) + 1;
info!("indexing [{}]: {}", n, path.display()); info!("indexing [{}]: {}", n, path.display());
// _guard travels into GuardedIter; released when all chunks are read let path_str = path.to_str().unwrap_or("").to_owned();
open_chunks(path).map(|iter| GuardedIter { inner: iter, _guard }) flat_active.fetch_add(1, Ordering::Relaxed);
obiread::open_nuc_stream(&path_str, k)
.map(|iter| GuardedIter { inner: iter, _guard, flat_active: Arc::clone(&flat_active) })
} }
} : Path => RawChunk, } : Path => NucPage,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, | {
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) } : NormChunk => Batch, let transform_active = Arc::clone(&transform_active);
move |page| {
transform_active.fetch_add(1, Ordering::Relaxed);
let result = obiskbuilder::build_superkmers_page(page, k, level_max, theta);
transform_active.fetch_sub(1, Ordering::Relaxed);
result
}
} : NucPage => Batch,
}; };
let pb = ProgressBar::new_spinner(); let pb = ProgressBar::new_spinner();
@@ -93,7 +113,9 @@ pub fn scatter(
(format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6)) (format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6))
}; };
let n_files = file_count.load(Ordering::Relaxed); let n_files = file_count.load(Ordering::Relaxed);
pb.set_message(format!("{count_str} {rate_str} {n_files} files")); let r = flat_active.load(Ordering::Relaxed);
let c = transform_active.load(Ordering::Relaxed);
pb.set_message(format!("{count_str} {rate_str} {n_files} files [R:{r} C:{c}]"));
} }
kp.write_batch(batch).unwrap_or_else(|e| { kp.write_batch(batch).unwrap_or_else(|e| {
eprintln!("error: {e}"); eprintln!("error: {e}");
+2 -108
View File
@@ -191,111 +191,5 @@ pub fn fastq_chunks<R: Read>(source: R) -> SeqChunkIter<R> {
} }
#[cfg(test)] #[cfg(test)]
mod tests { #[path = "tests/chunk.rs"]
use super::*; mod tests;
use crate::fasta::end_of_last_fasta_entry;
use crate::fastq::end_of_last_fastq_entry;
fn fasta_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> {
SeqChunkIter::new(data, block_size, end_of_last_fasta_entry, None)
}
fn fastq_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> {
SeqChunkIter::new(data, block_size, end_of_last_fastq_entry, None)
}
fn rope_to_vec(rope: &Rope) -> Vec<u8> {
rope.fw_cursor().collect()
}
// ── FASTA ─────────────────────────────────────────────────────────────────
#[test]
fn fasta_single_record_one_chunk() {
let data: &[u8] = b">s1\nACGT\n";
let chunks: Vec<_> = fasta_iter(data, 64).collect::<Result<_, _>>().unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(rope_to_vec(&chunks[0]), b">s1\nACGT\n");
}
#[test]
fn fasta_two_records_split_across_chunks() {
let data: &[u8] = b">s1\nACGT\n>s2\nTTTT\n";
let chunks: Vec<_> = fasta_iter(data, 10).collect::<Result<_, _>>().unwrap();
let all: Vec<u8> = chunks.iter().flat_map(|r| rope_to_vec(r)).collect();
assert_eq!(all, b">s1\nACGT\n>s2\nTTTT\n");
}
#[test]
fn fasta_each_chunk_ends_on_complete_record() {
let data: &[u8] = b">s1\nACGT\n>s2\nCCCC\n>s3\nGGGG\n>s4\nTTTT\n";
for block in [8, 12, 20, 100] {
let chunks: Vec<_> = fasta_iter(data, block).collect::<Result<_, _>>().unwrap();
for rope in &chunks {
let flat = rope_to_vec(rope);
assert_eq!(flat[0], b'>', "block={block}: chunk doesn't start with '>'");
assert_eq!(
*flat.last().unwrap(),
b'\n',
"block={block}: chunk doesn't end with newline"
);
}
}
}
// ── FASTQ ─────────────────────────────────────────────────────────────────
fn make_fastq(records: &[(&[u8], &[u8])]) -> Vec<u8> {
let mut buf = Vec::new();
for (seq, qual) in records {
buf.extend_from_slice(b"@hdr\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(qual);
buf.push(b'\n');
}
buf
}
#[test]
fn fastq_single_record_one_chunk() {
let data = Box::leak(make_fastq(&[(b"ACGT", b"IIII")]).into_boxed_slice());
let chunks: Vec<_> = fastq_iter(data, 64).collect::<Result<_, _>>().unwrap();
assert_eq!(chunks.len(), 1);
}
#[test]
fn fastq_at_in_quality_handled() {
let data = Box::leak(
make_fastq(&[(b"ACGTACGT", b"@@@@IIII"), (b"TTTTTTTT", b"HHHHHHHH")])
.into_boxed_slice(),
);
let chunks: Vec<_> = fastq_iter(data, 16).collect::<Result<_, _>>().unwrap();
let all: Vec<u8> = chunks.iter().flat_map(|r| rope_to_vec(r)).collect();
assert_eq!(all, *data);
}
#[test]
fn fastq_each_chunk_starts_with_at() {
let data = Box::leak(
make_fastq(&[
(b"ACGT", b"IIII"),
(b"CCCC", b"JJJJ"),
(b"GGGG", b"KKKK"),
(b"TTTT", b"LLLL"),
])
.into_boxed_slice(),
);
for block in [18, 30, 60] {
let chunks: Vec<_> = fastq_iter(data, block).collect::<Result<_, _>>().unwrap();
for rope in &chunks {
let first_byte = rope_to_vec(rope)[0];
assert_eq!(
first_byte, b'@',
"block={block}: chunk doesn't start with '@'"
);
}
}
}
}
+2 -68
View File
@@ -35,71 +35,5 @@ pub fn end_of_last_fasta_entry(rope: &Rope) -> Option<usize> {
} }
#[cfg(test)] #[cfg(test)]
mod tests { #[path = "tests/fasta.rs"]
use super::*; mod tests;
fn rope(data: &[u8]) -> Rope {
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
fn rope2(a: &[u8], b: &[u8]) -> Rope {
let mut r = Rope::new(None);
r.push(a.to_vec());
r.push(b.to_vec());
r
}
fn flat(r: &Rope) -> Vec<u8> {
r.fw_cursor().collect()
}
#[test]
fn single_entry_no_boundary() {
assert_eq!(end_of_last_fasta_entry(&rope(b">seq1\nACGT\n")), None);
}
#[test]
fn two_entries_cuts_at_second_header() {
let data = b">seq1\nACGT\n>seq2\nTTTT\n";
let r = rope(data);
let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">seq2\nTTTT\n");
assert_eq!(&flat(&r)[..pos], b">seq1\nACGT\n");
}
#[test]
fn three_entries_cuts_at_last_header() {
let data = b">s1\nAA\n>s2\nCC\n>s3\nGG\n";
let r = rope(data);
let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">s3\nGG\n");
}
#[test]
fn multiline_sequence() {
let data = b">s1\nACGT\nACGT\n>s2\nTTTT\n";
let r = rope(data);
let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">s2\nTTTT\n");
}
#[test]
fn crlf_line_endings() {
let data = b">s1\r\nACGT\r\n>s2\r\nTTTT\r\n";
let r = rope(data);
let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">s2\r\nTTTT\r\n");
}
#[test]
fn boundary_spans_two_blocks() {
let a = b">s1\nACGT\n";
let b = b">s2\nTTTT\n";
let r = rope2(a, b);
let all: Vec<u8> = flat(&r);
let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&all[pos..], b">s2\nTTTT\n");
}
}
+2 -75
View File
@@ -107,78 +107,5 @@ pub fn end_of_last_fastq_entry(rope: &Rope) -> Option<usize> {
} }
#[cfg(test)] #[cfg(test)]
mod tests { #[path = "tests/fastq.rs"]
use super::*; mod tests;
fn rope(data: &[u8]) -> Rope {
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
fn make_fastq(records: &[(&[u8], &[u8])]) -> Vec<u8> {
let mut buf = Vec::new();
for (seq, qual) in records {
buf.extend_from_slice(b"@header\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(qual);
buf.push(b'\n');
}
buf
}
fn flat(r: &Rope) -> Vec<u8> {
r.fw_cursor().collect()
}
#[test]
fn single_record_no_boundary() {
let buf = make_fastq(&[(b"ACGT", b"IIII")]);
assert_eq!(end_of_last_fastq_entry(&rope(&buf)), None);
}
#[test]
fn two_records_cuts_at_second() {
let buf = make_fastq(&[(b"ACGT", b"IIII"), (b"TTTT", b"HHHH")]);
let r = rope(&buf);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(flat(&r)[pos], b'@');
assert_eq!(
&flat(&r)[pos..],
make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()
);
}
#[test]
fn three_records_cuts_at_last() {
let buf = make_fastq(&[(b"ACGT", b"IIII"), (b"CCCC", b"JJJJ"), (b"GGGG", b"KKKK")]);
let r = rope(&buf);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(
&flat(&r)[pos..],
make_fastq(&[(b"GGGG", b"KKKK")]).as_slice()
);
}
#[test]
fn at_sign_in_quality_does_not_confuse() {
let buf = make_fastq(&[(b"ACGTACGT", b"@@@@IIII"), (b"TTTT", b"HHHH")]);
let r = rope(&buf);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(
&flat(&r)[pos..],
make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()
);
}
#[test]
fn crlf_line_endings() {
let data = b"@h\r\nACGT\r\n+\r\nIIII\r\n@h\r\nTTTT\r\n+\r\nHHHH\r\n";
let r = rope(data);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(flat(&r)[pos], b'@');
assert_eq!(&flat(&r)[pos..], b"@h\r\nTTTT\r\n+\r\nHHHH\r\n");
}
}
+7 -3
View File
@@ -10,15 +10,19 @@ mod fasta;
mod fastq; mod fastq;
mod mimetype; mod mimetype;
pub mod normalize; pub mod normalize;
mod nucstream;
mod path_iterator; mod path_iterator;
pub mod peakreader; pub mod peakreader;
pub mod record; pub mod record;
pub mod xopen; pub mod xopen;
pub use chunk::{SeqChunkIter, fasta_chunks, fastq_chunks, pub use chunk::{
read_fasta_chunks, read_fastq_chunks, read_sequence_chunks}; SeqChunkIter, fasta_chunks, fastq_chunks, read_fasta_chunks, read_fastq_chunks,
pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk}; read_sequence_chunks,
};
pub use mimetype::MimeTypeGuesser; pub use mimetype::MimeTypeGuesser;
pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk};
pub use nucstream::{NucPage, NucPageCursor, open_nuc_stream};
pub use path_iterator::{PathIter, path_iter}; pub use path_iterator::{PathIter, path_iter};
pub use peakreader::PeekReader; pub use peakreader::PeekReader;
pub use xopen::xopen; pub use xopen::xopen;
+5
View File
@@ -9,6 +9,10 @@ use crate::peakreader::PeekReader;
const BUF_SIZE: usize = 4096; const BUF_SIZE: usize = 4096;
fn is_gbff(buf: &[u8]) -> bool {
buf.starts_with(b"LOCUS ")
}
static RE_FASTA: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^>[^ ]").unwrap()); static RE_FASTA: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^>[^ ]").unwrap());
fn is_fasta(buf: &[u8]) -> bool { fn is_fasta(buf: &[u8]) -> bool {
std::str::from_utf8(buf).map_or(false, |s| RE_FASTA.is_match(s)) std::str::from_utf8(buf).map_or(false, |s| RE_FASTA.is_match(s))
@@ -30,6 +34,7 @@ fn is_text(buf: &[u8]) -> bool {
// Most specific formats (fastq, fasta) come before the generic text/plain fallback. // Most specific formats (fastq, fasta) come before the generic text/plain fallback.
static INFER: LazyLock<Infer> = LazyLock::new(|| { static INFER: LazyLock<Infer> = LazyLock::new(|| {
let mut infer = Infer::new(); let mut infer = Infer::new();
infer.add("text/gbff", "gbff", is_gbff);
infer.add("text/fastq", "fastq", is_fastq); infer.add("text/fastq", "fastq", is_fastq);
infer.add("text/fasta", "fasta", is_fasta); infer.add("text/fasta", "fasta", is_fasta);
infer.add("text/plain", "txt", is_text); infer.add("text/plain", "txt", is_text);
+2 -236
View File
@@ -215,239 +215,5 @@ fn is_acgt(upper: u8) -> bool {
// ── tests ───────────────────────────────────────────────────────────────────── // ── tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)] #[cfg(test)]
mod tests { #[path = "tests/normalize.rs"]
use super::*; mod tests;
fn make_rope(data: &[u8]) -> Rope {
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
fn flat(r: Rope) -> Vec<u8> {
r.fw_cursor().collect()
}
fn run_fastq(data: &[u8], k: usize) -> Vec<u8> {
flat(normalize_fastq_chunk(make_rope(data), k))
}
fn run_fasta(data: &[u8], k: usize) -> Vec<u8> {
flat(normalize_fasta_chunk(make_rope(data), k))
}
fn make_fastq(records: &[&[u8]]) -> Vec<u8> {
let mut buf = Vec::new();
for seq in records {
buf.extend_from_slice(b"@hdr\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(&vec![b'I'; seq.len()]);
buf.push(b'\n');
}
buf
}
fn make_fasta(records: &[(&[u8], &[u8])]) -> Vec<u8> {
let mut buf = Vec::new();
for (id, seq) in records {
buf.push(b'>');
buf.extend_from_slice(id);
buf.push(b'\n');
buf.extend_from_slice(seq);
buf.push(b'\n');
}
buf
}
// ── FASTQ basic ──────────────────────────────────────────────────────────
#[test]
fn single_record_produces_seq_then_null() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGTACGT"]), 4), b"ACGTACGT\x00");
}
#[test]
fn two_records_concatenated() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]), 4),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
#[test]
fn lowercase_input_uppercased() {
assert_eq!(run_fastq(&make_fastq(&[b"acgtacgt"]), 4), b"ACGTACGT\x00");
}
#[test]
fn mixed_case_uppercased() {
assert_eq!(run_fastq(&make_fastq(&[b"AcGtAcGt"]), 4), b"ACGTACGT\x00");
}
#[test]
fn sequence_shorter_than_k_discarded() {
assert_eq!(run_fastq(&make_fastq(&[b"ACG"]), 4), b"");
}
#[test]
fn sequence_exactly_k_kept() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGT"]), 4), b"ACGT\x00");
}
#[test]
fn short_record_among_valid_ones_discarded() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTACGT", b"AC", b"TTTTTTTT"]), 4),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
#[test]
fn ambiguous_splits_into_two_segments() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTNACGT"]), 4),
b"ACGT\x00ACGT\x00"
);
}
#[test]
fn segment_after_ambiguous_too_short_discarded() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTACGTNAC"]), 4),
b"ACGTACGT\x00"
);
}
#[test]
fn consecutive_ambiguous_produce_no_empty_segment() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTNNNNACGT"]), 4),
b"ACGT\x00ACGT\x00"
);
}
#[test]
fn ambiguous_at_start_skipped() {
assert_eq!(run_fastq(&make_fastq(&[b"NNACGTACGT"]), 4), b"ACGTACGT\x00");
}
#[test]
fn ambiguous_at_end_produces_no_trailing_empty() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGTACGTNN"]), 4), b"ACGTACGT\x00");
}
#[test]
fn crlf_handled() {
let data = b"@hdr\r\nACGTACGT\r\n+\r\nIIIIIIII\r\n";
assert_eq!(run_fastq(data, 4), b"ACGTACGT\x00");
}
#[test]
fn multi_slice_rope() {
let data = make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]);
let mid = data.len() / 2;
let mut rope = Rope::new(None);
rope.push(data[..mid].to_vec());
rope.push(data[mid..].to_vec());
assert_eq!(
flat(normalize_fastq_chunk(rope, 4)),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
// ── FASTA ─────────────────────────────────────────────────────────────────
#[test]
fn fasta_single_record() {
assert_eq!(
run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT")]), 4),
b"ACGTACGT\x00"
);
}
#[test]
fn fasta_two_records() {
assert_eq!(
run_fasta(
&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]),
4
),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
#[test]
fn fasta_multiline_sequence_concatenated() {
assert_eq!(
run_fasta(b">s1\nACGT\nACGT\nACGT\n", 4),
b"ACGTACGTACGT\x00"
);
}
#[test]
fn fasta_lowercase_uppercased() {
assert_eq!(
run_fasta(&make_fasta(&[(b"s1", b"acgtacgt")]), 4),
b"ACGTACGT\x00"
);
}
#[test]
fn fasta_short_record_discarded() {
assert_eq!(run_fasta(&make_fasta(&[(b"s1", b"ACG")]), 4), b"");
}
#[test]
fn fasta_short_among_valid_discarded() {
assert_eq!(
run_fasta(
&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"AC"), (b"s3", b"TTTTTTTT")]),
4
),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
#[test]
fn fasta_ambiguous_splits_segments() {
assert_eq!(run_fasta(b">s1\nACGTNACGT\n", 4), b"ACGT\x00ACGT\x00");
}
#[test]
fn fasta_ambiguous_across_line_boundary() {
assert_eq!(run_fasta(b">s1\nACGT\nNACGT\n", 4), b"ACGT\x00ACGT\x00");
}
#[test]
fn fasta_ambiguous_short_segment_discarded() {
assert_eq!(run_fasta(b">s1\nACGTACGTNAC\n", 4), b"ACGTACGT\x00");
}
#[test]
fn fasta_no_trailing_newline() {
assert_eq!(run_fasta(b">s1\nACGTACGT", 4), b"ACGTACGT\x00");
}
#[test]
fn fasta_crlf_line_endings() {
assert_eq!(
run_fasta(b">s1\r\nACGT\r\nACGT\r\n>s2\r\nTTTT\r\n", 4),
b"ACGTACGT\x00TTTT\x00"
);
}
#[test]
fn fasta_multi_slice_rope() {
let data = make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]);
let mid = data.len() / 2;
let mut rope = Rope::new(None);
rope.push(data[..mid].to_vec());
rope.push(data[mid..].to_vec());
assert_eq!(
flat(normalize_fasta_chunk(rope, 4)),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
}
+733
View File
@@ -0,0 +1,733 @@
use std::io::{self, Read};
use std::mem::ManuallyDrop;
use std::sync::{Arc, Mutex};
use crate::mimetype::MimeTypeGuesser;
use crate::xopen::open_raw;
pub const MAX_K: usize = 31;
const PAGE_SIZE: usize = 65536;
// overlap (MAX_K - 1) + page data (PAGE_SIZE) + 1 byte for the end-of-page terminating 0
const BUF_SIZE: usize = MAX_K + PAGE_SIZE;
// ─── OverlapState ─────────────────────────────────────────────────────────────
pub(crate) struct OverlapState {
data: [u8; MAX_K],
len: usize,
k: usize,
}
impl OverlapState {
pub(crate) fn new(k: usize) -> Self {
assert!(k > 0 && k <= MAX_K);
Self {
data: [0u8; MAX_K],
len: 0,
k,
}
}
}
// ─── NucParser trait ──────────────────────────────────────────────────────────
// Transforms a raw page into a compacted nucleotide stream in-place.
//
// Buffer layout on each call:
// buf[0..overlap_len()] — overlap bytes copied by write_overlap()
// buf[overlap_len()..overlap_len()+n] — raw bytes just read from the source
//
// Returns the number of output bytes in buf[0..returned].
pub(crate) trait NucParser {
// required: format-specific
fn new(k: usize) -> Self
where
Self: Sized;
fn overlap_state(&self) -> &OverlapState;
fn overlap_state_mut(&mut self) -> &mut OverlapState;
fn is_in_seq(&self) -> bool;
fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize;
// provided: format-independent overlap management
fn overlap_len(&self) -> usize {
self.overlap_state().len
}
fn write_overlap(&self, buf: &mut [u8]) {
let ol = &self.overlap_state();
buf[..ol.len].copy_from_slice(&ol.data[..ol.len]);
}
// Called at end of parse_inplace: saves overlap state and returns adjusted j.
// seq_start is the j-position where the last sequence started in this call's output.
fn save_overlap(&mut self, buf: &mut [u8], j: usize, seq_start: usize) -> usize {
if !self.is_in_seq() {
self.overlap_state_mut().len = 0;
return j;
}
let seq_len = j - seq_start;
let k = self.overlap_state().k;
if seq_len >= k {
// Sequence long enough: save last k-1 nucleotides, terminate with 0.
let ol = k - 1;
self.overlap_state_mut().data[..ol].copy_from_slice(&buf[j - ol..j]);
self.overlap_state_mut().len = ol;
// SAFETY: j <= total - 1 < BUF_SIZE = buf.len()
// (total = overlap_len + n <= (MAX_K-1) + PAGE_SIZE = BUF_SIZE - 1)
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j + 1
} else if seq_len > 0 {
// Short sequence (< k): save whole fragment, strip from output.
self.overlap_state_mut().data[..seq_len].copy_from_slice(&buf[seq_start..j]);
self.overlap_state_mut().len = seq_len;
seq_start
} else {
self.overlap_state_mut().len = 0;
j
}
}
}
// ─── FASTA parser ─────────────────────────────────────────────────────────────
#[derive(Clone, Copy)]
enum FastaState {
OutSeq,
InTitle,
InSeq,
InAmbiguous,
}
pub(crate) struct FastaParser {
state: FastaState,
overlap: OverlapState,
}
impl NucParser for FastaParser {
fn new(k: usize) -> Self {
Self {
state: FastaState::OutSeq,
overlap: OverlapState::new(k),
}
}
#[inline]
fn overlap_state(&self) -> &OverlapState {
&self.overlap
}
#[inline]
fn overlap_state_mut(&mut self) -> &mut OverlapState {
&mut self.overlap
}
#[inline]
fn is_in_seq(&self) -> bool {
matches!(self.state, FastaState::InSeq)
}
fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize {
let total = self.overlap.len + n;
let mut i = 0; // read index
let mut j = 0; // write index (invariant: j <= i always)
// j-position where the current sequence started in this call's output;
// meaningful only when state is InSeq.
let mut seq_start: usize = 0;
while i < total {
// SAFETY: i < total <= BUF_SIZE = buf.len()
let byte = unsafe { *buf.get_unchecked(i) };
match self.state {
FastaState::OutSeq => {
if byte == b'>' {
self.state = FastaState::InTitle;
}
i += 1;
}
FastaState::InTitle => {
if byte == b'\n' || byte == b'\r' {
self.state = FastaState::InSeq;
seq_start = j;
}
i += 1;
}
FastaState::InSeq => {
if byte == b'\n' || byte == b'\r' {
i += 1;
continue;
}
let nuc = byte & 0xDF; // to uppercase
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
i += 1;
} else if byte == b'>' {
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = FastaState::InTitle;
i += 1;
} else {
// first ambiguous base: end current sequence if non-empty
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = FastaState::InAmbiguous;
i += 1;
}
}
FastaState::InAmbiguous => {
if byte == b'\n' || byte == b'\r' {
i += 1;
continue;
}
if byte == b'>' {
self.state = FastaState::InTitle;
i += 1;
continue;
}
let nuc = byte & 0xDF;
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
seq_start = j;
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
self.state = FastaState::InSeq;
}
i += 1;
}
}
}
self.save_overlap(buf, j, seq_start)
}
}
// ─── FASTQ parser ─────────────────────────────────────────────────────────────
#[derive(Clone, Copy)]
enum FastqState {
OutSeq,
InTitle,
InSeq,
InAmbiguous,
InQualTitle,
InQual,
}
pub(crate) struct FastqParser {
state: FastqState,
overlap: OverlapState,
}
impl NucParser for FastqParser {
fn new(k: usize) -> Self {
Self {
state: FastqState::OutSeq,
overlap: OverlapState::new(k),
}
}
#[inline]
fn overlap_state(&self) -> &OverlapState {
&self.overlap
}
#[inline]
fn overlap_state_mut(&mut self) -> &mut OverlapState {
&mut self.overlap
}
#[inline]
fn is_in_seq(&self) -> bool {
matches!(self.state, FastqState::InSeq)
}
fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize {
let total = self.overlap.len + n;
let mut i = 0;
let mut j = 0;
let mut seq_start: usize = 0;
while i < total {
// SAFETY: i < total <= BUF_SIZE = buf.len()
let byte = unsafe { *buf.get_unchecked(i) };
match self.state {
FastqState::OutSeq => {
if byte == b'@' {
self.state = FastqState::InTitle;
}
i += 1;
}
FastqState::InTitle => {
if byte == b'\n' || byte == b'\r' {
self.state = FastqState::InSeq;
seq_start = j;
}
i += 1;
}
FastqState::InSeq => {
if byte == b'\n' || byte == b'\r' {
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = FastqState::InQualTitle;
i += 1;
continue;
}
let nuc = byte & 0xDF;
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
} else {
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = FastqState::InAmbiguous;
}
i += 1;
}
FastqState::InAmbiguous => {
if byte == b'\n' || byte == b'\r' {
self.state = FastqState::InQualTitle;
i += 1;
continue;
}
let nuc = byte & 0xDF;
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
seq_start = j;
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
self.state = FastqState::InSeq;
}
i += 1;
}
FastqState::InQualTitle => {
if byte == b'\n' || byte == b'\r' {
self.state = FastqState::InQual;
}
i += 1;
}
FastqState::InQual => {
if byte == b'\n' || byte == b'\r' {
self.state = FastqState::OutSeq;
}
i += 1;
}
}
}
self.save_overlap(buf, j, seq_start)
}
}
// ─── GenBank parser ───────────────────────────────────────────────────────────
const ORIGIN_TAIL: &[u8] = b"RIGIN";
#[derive(Clone, Copy)]
enum GenbankState {
OutSeq,
MatchOrigin,
SkipOriginLine,
InSeq,
InSlash,
InAmbiguous,
}
pub(crate) struct GenbankParser {
state: GenbankState,
overlap: OverlapState,
keyword_pos: usize,
at_line_start: bool,
}
impl NucParser for GenbankParser {
fn new(k: usize) -> Self {
Self {
state: GenbankState::OutSeq,
overlap: OverlapState::new(k),
keyword_pos: 0,
at_line_start: true,
}
}
#[inline]
fn overlap_state(&self) -> &OverlapState {
&self.overlap
}
#[inline]
fn overlap_state_mut(&mut self) -> &mut OverlapState {
&mut self.overlap
}
#[inline]
fn is_in_seq(&self) -> bool {
matches!(self.state, GenbankState::InSeq)
}
fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize {
let total = self.overlap.len + n;
let mut i = 0;
let mut j = 0;
let mut seq_start: usize = 0;
while i < total {
// SAFETY: i < total <= BUF_SIZE = buf.len()
let byte = unsafe { *buf.get_unchecked(i) };
match self.state {
GenbankState::OutSeq => {
if byte == b'\n' || byte == b'\r' {
self.at_line_start = true;
} else if self.at_line_start && byte == b'O' {
self.state = GenbankState::MatchOrigin;
self.keyword_pos = 1;
self.at_line_start = false;
} else {
self.at_line_start = false;
}
i += 1;
}
GenbankState::MatchOrigin => {
if byte == b'\n' || byte == b'\r' {
self.state = GenbankState::OutSeq;
self.at_line_start = true;
} else if byte == ORIGIN_TAIL[self.keyword_pos - 1] {
self.keyword_pos += 1;
if self.keyword_pos == 6 {
self.state = GenbankState::SkipOriginLine;
}
} else {
self.state = GenbankState::OutSeq;
self.at_line_start = false;
}
i += 1;
}
GenbankState::SkipOriginLine => {
if byte == b'\n' || byte == b'\r' {
self.state = GenbankState::InSeq;
seq_start = j;
}
i += 1;
}
GenbankState::InSeq => {
if byte == b'\n' || byte == b'\r' {
self.at_line_start = true;
i += 1;
continue;
}
if self.at_line_start && byte == b'/' {
self.state = GenbankState::InSlash;
self.at_line_start = false;
i += 1;
continue;
}
self.at_line_start = false;
let nuc = byte & 0xDF;
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
} else if byte.is_ascii_digit() || byte == b' ' {
// position numbers and spacing between groups: skip
} else {
// ambiguous base: end current sequence if non-empty
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = GenbankState::InAmbiguous;
}
i += 1;
}
GenbankState::InSlash => {
if byte == b'/' {
// confirmed "//": end of sequence record
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = GenbankState::OutSeq;
self.at_line_start = false;
} else if byte == b'\n' || byte == b'\r' {
// single '/' line: back to sequence
self.state = GenbankState::InSeq;
self.at_line_start = true;
} else {
// false positive: single '/' mid-line, resume sequence
self.state = GenbankState::InSeq;
self.at_line_start = false;
}
i += 1;
}
GenbankState::InAmbiguous => {
if byte == b'\n' || byte == b'\r' {
self.at_line_start = true;
i += 1;
continue;
}
if self.at_line_start && byte == b'/' {
self.state = GenbankState::InSlash;
self.at_line_start = false;
i += 1;
continue;
}
self.at_line_start = false;
let nuc = byte & 0xDF;
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
seq_start = j;
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
self.state = GenbankState::InSeq;
}
// digits, spaces, other ambiguous codes: skip
i += 1;
}
}
}
self.save_overlap(buf, j, seq_start)
}
}
// ─── NucPage ──────────────────────────────────────────────────────────────────
/// Owned page of compacted nucleotides: uppercase A/C/G/T bytes separated by `0`
/// at sequence boundaries. Automatically returns its buffer to the pool on drop.
pub struct NucPage {
data: ManuallyDrop<Vec<u8>>,
len: usize,
pool: Arc<Mutex<Vec<Vec<u8>>>>,
}
impl std::ops::Deref for NucPage {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.data[..self.len]
}
}
impl Drop for NucPage {
fn drop(&mut self) {
// SAFETY: data is never accessed after this point
let buf = unsafe { ManuallyDrop::take(&mut self.data) };
self.pool.lock().unwrap().push(buf);
}
}
// ─── NucPageCursor ────────────────────────────────────────────────────────────
/// A forward cursor over the normalised bytes of a [`NucPage`].
///
/// Provides the `next_byte` / `rewind` interface consumed by
/// [`obiskbuilder::SuperKmerStreamIter`].
pub struct NucPageCursor<'a> {
data: &'a [u8],
pos: usize,
}
impl NucPageCursor<'_> {
/// Returns the next byte in the page, or `None` at end.
#[inline]
pub fn next_byte(&mut self) -> Option<u8> {
if self.pos < self.data.len() {
let b = self.data[self.pos];
self.pos += 1;
Some(b)
} else {
None
}
}
/// Steps the cursor back by `n` bytes.
///
/// The caller guarantees that the last `n` bytes were all `ACGT`
/// (no `0x00` separators), so they are still in the page buffer.
#[inline]
pub fn rewind(&mut self, n: usize) {
self.pos -= n;
}
/// Total number of bytes in the underlying page.
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}
/// Returns `true` if the page contains no bytes.
#[inline]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
impl NucPage {
/// Creates a forward cursor positioned at the start of this page.
pub fn cursor(&self) -> NucPageCursor<'_> {
NucPageCursor { data: self, pos: 0 }
}
}
// ─── NucStream ────────────────────────────────────────────────────────────────
pub(crate) struct NucStream<R: Read, P: NucParser> {
reader: R,
parser: P,
pool: Arc<Mutex<Vec<Vec<u8>>>>,
eof: bool,
}
impl<R: Read, P: NucParser> NucStream<R, P> {
pub(crate) fn new(reader: R, k: usize) -> Self {
Self {
reader,
parser: P::new(k),
pool: Arc::new(Mutex::new(Vec::new())),
eof: false,
}
}
pub(crate) fn read_page(&mut self) -> Option<NucPage> {
loop {
if self.eof {
return None;
}
// take a buffer from the pool, or allocate fresh if all are in-flight
let mut buf = self
.pool
.lock()
.unwrap()
.pop()
.unwrap_or_else(|| vec![0u8; BUF_SIZE]);
let ol = self.parser.overlap_len();
self.parser.write_overlap(&mut buf[..ol]);
let n = self.reader.read(&mut buf[ol..ol + PAGE_SIZE]).unwrap_or(0);
if n == 0 {
self.eof = true;
if ol == 0 {
self.pool.lock().unwrap().push(buf);
return None;
}
}
let out_len = self.parser.parse_inplace(&mut buf, n);
if out_len > 0 {
return Some(NucPage {
data: ManuallyDrop::new(buf),
len: out_len,
pool: Arc::clone(&self.pool),
});
}
// empty page (all headers/ambiguous): return buf to pool and loop
self.pool.lock().unwrap().push(buf);
}
}
}
impl<R: Read, P: NucParser> Iterator for NucStream<R, P> {
type Item = NucPage;
fn next(&mut self) -> Option<NucPage> {
self.read_page()
}
}
// ─── FastaNucStream ───────────────────────────────────────────────────────────
pub(crate) type FastaNucStream<R> = NucStream<R, FastaParser>;
pub(crate) type FastqNucStream<R> = NucStream<R, FastqParser>;
pub(crate) type GenbankNucStream<R> = NucStream<R, GenbankParser>;
// ─── AnyNucStream ─────────────────────────────────────────────────────────────
pub(crate) enum AnyNucStream<R: Read> {
Fasta(FastaNucStream<R>),
Fastq(FastqNucStream<R>),
Genbank(GenbankNucStream<R>),
}
impl<R: Read> Iterator for AnyNucStream<R> {
type Item = NucPage;
fn next(&mut self) -> Option<NucPage> {
match self {
AnyNucStream::Fasta(s) => s.next(),
AnyNucStream::Fastq(s) => s.next(),
AnyNucStream::Genbank(s) => s.next(),
}
}
}
fn dispatch<R: Read>(
mut guesser: MimeTypeGuesser<R>,
k: usize,
) -> Option<AnyNucStream<MimeTypeGuesser<R>>> {
match guesser.mime_type() {
Some("text/fasta") => Some(AnyNucStream::Fasta(NucStream::new(guesser, k))),
Some("text/fastq") => Some(AnyNucStream::Fastq(NucStream::new(guesser, k))),
Some("text/gbff") => Some(AnyNucStream::Genbank(NucStream::new(guesser, k))),
_ => None,
}
}
/// Wraps an already-open reader in a nucleotide stream, detecting its format.
/// Returns `None` if the format is not recognised.
pub(crate) fn nuc_stream<R: Read>(reader: R, k: usize) -> Option<AnyNucStream<MimeTypeGuesser<R>>> {
dispatch(MimeTypeGuesser::new(reader), k)
}
/// Opens a nucleotide stream from any source (file path, URL, or `-` for stdin),
/// with transparent decompression and automatic format detection.
///
/// # Errors
/// Returns an `io::Error` if the source cannot be opened, decompression fails,
/// or the format is not recognised.
pub fn open_nuc_stream(
source: &str,
k: usize,
) -> io::Result<Box<dyn Iterator<Item = NucPage> + Send>> {
let reader = open_raw(source)?;
nuc_stream(reader, k)
.map(|s| Box::new(s) as Box<dyn Iterator<Item = NucPage> + Send>)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "unknown sequence format"))
}
#[cfg(test)]
#[path = "tests/nucstream.rs"]
mod tests;
+7 -1
View File
@@ -71,7 +71,7 @@ pub fn path_iter(paths: &[String]) -> PathIter {
PathIter::new(path_bufs) PathIter::new(path_bufs)
} }
/// Returns true if the path ends with a fasta or fastq file extension. /// Returns true if the path ends with a recognised sequence file extension.
fn is_fasta_or_fastq(path: &Path) -> bool { fn is_fasta_or_fastq(path: &Path) -> bool {
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
name.ends_with(".fasta") name.ends_with(".fasta")
@@ -82,4 +82,10 @@ fn is_fasta_or_fastq(path: &Path) -> bool {
|| name.ends_with(".fa.gz") || name.ends_with(".fa.gz")
|| name.ends_with(".fastq.gz") || name.ends_with(".fastq.gz")
|| name.ends_with(".fq.gz") || name.ends_with(".fq.gz")
|| name.ends_with(".gbff")
|| name.ends_with(".gbk")
|| name.ends_with(".gb")
|| name.ends_with(".gbff.gz")
|| name.ends_with(".gbk.gz")
|| name.ends_with(".gb.gz")
} }
+106
View File
@@ -0,0 +1,106 @@
use super::*;
use crate::fasta::end_of_last_fasta_entry;
use crate::fastq::end_of_last_fastq_entry;
fn fasta_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> {
SeqChunkIter::new(data, block_size, end_of_last_fasta_entry, None)
}
fn fastq_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> {
SeqChunkIter::new(data, block_size, end_of_last_fastq_entry, None)
}
fn rope_to_vec(rope: &Rope) -> Vec<u8> {
rope.fw_cursor().collect()
}
// ── FASTA ─────────────────────────────────────────────────────────────────
#[test]
fn fasta_single_record_one_chunk() {
let data: &[u8] = b">s1\nACGT\n";
let chunks: Vec<_> = fasta_iter(data, 64).collect::<Result<_, _>>().unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(rope_to_vec(&chunks[0]), b">s1\nACGT\n");
}
#[test]
fn fasta_two_records_split_across_chunks() {
let data: &[u8] = b">s1\nACGT\n>s2\nTTTT\n";
let chunks: Vec<_> = fasta_iter(data, 10).collect::<Result<_, _>>().unwrap();
let all: Vec<u8> = chunks.iter().flat_map(|r| rope_to_vec(r)).collect();
assert_eq!(all, b">s1\nACGT\n>s2\nTTTT\n");
}
#[test]
fn fasta_each_chunk_ends_on_complete_record() {
let data: &[u8] = b">s1\nACGT\n>s2\nCCCC\n>s3\nGGGG\n>s4\nTTTT\n";
for block in [8, 12, 20, 100] {
let chunks: Vec<_> = fasta_iter(data, block).collect::<Result<_, _>>().unwrap();
for rope in &chunks {
let flat = rope_to_vec(rope);
assert_eq!(flat[0], b'>', "block={block}: chunk doesn't start with '>'");
assert_eq!(
*flat.last().unwrap(),
b'\n',
"block={block}: chunk doesn't end with newline"
);
}
}
}
// ── FASTQ ─────────────────────────────────────────────────────────────────
fn make_fastq(records: &[(&[u8], &[u8])]) -> Vec<u8> {
let mut buf = Vec::new();
for (seq, qual) in records {
buf.extend_from_slice(b"@hdr\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(qual);
buf.push(b'\n');
}
buf
}
#[test]
fn fastq_single_record_one_chunk() {
let data = Box::leak(make_fastq(&[(b"ACGT", b"IIII")]).into_boxed_slice());
let chunks: Vec<_> = fastq_iter(data, 64).collect::<Result<_, _>>().unwrap();
assert_eq!(chunks.len(), 1);
}
#[test]
fn fastq_at_in_quality_handled() {
let data = Box::leak(
make_fastq(&[(b"ACGTACGT", b"@@@@IIII"), (b"TTTTTTTT", b"HHHHHHHH")])
.into_boxed_slice(),
);
let chunks: Vec<_> = fastq_iter(data, 16).collect::<Result<_, _>>().unwrap();
let all: Vec<u8> = chunks.iter().flat_map(|r| rope_to_vec(r)).collect();
assert_eq!(all, *data);
}
#[test]
fn fastq_each_chunk_starts_with_at() {
let data = Box::leak(
make_fastq(&[
(b"ACGT", b"IIII"),
(b"CCCC", b"JJJJ"),
(b"GGGG", b"KKKK"),
(b"TTTT", b"LLLL"),
])
.into_boxed_slice(),
);
for block in [18, 30, 60] {
let chunks: Vec<_> = fastq_iter(data, block).collect::<Result<_, _>>().unwrap();
for rope in &chunks {
let first_byte = rope_to_vec(rope)[0];
assert_eq!(
first_byte, b'@',
"block={block}: chunk doesn't start with '@'"
);
}
}
}
+66
View File
@@ -0,0 +1,66 @@
use super::*;
fn rope(data: &[u8]) -> Rope {
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
fn rope2(a: &[u8], b: &[u8]) -> Rope {
let mut r = Rope::new(None);
r.push(a.to_vec());
r.push(b.to_vec());
r
}
fn flat(r: &Rope) -> Vec<u8> {
r.fw_cursor().collect()
}
#[test]
fn single_entry_no_boundary() {
assert_eq!(end_of_last_fasta_entry(&rope(b">seq1\nACGT\n")), None);
}
#[test]
fn two_entries_cuts_at_second_header() {
let data = b">seq1\nACGT\n>seq2\nTTTT\n";
let r = rope(data);
let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">seq2\nTTTT\n");
assert_eq!(&flat(&r)[..pos], b">seq1\nACGT\n");
}
#[test]
fn three_entries_cuts_at_last_header() {
let data = b">s1\nAA\n>s2\nCC\n>s3\nGG\n";
let r = rope(data);
let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">s3\nGG\n");
}
#[test]
fn multiline_sequence() {
let data = b">s1\nACGT\nACGT\n>s2\nTTTT\n";
let r = rope(data);
let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">s2\nTTTT\n");
}
#[test]
fn crlf_line_endings() {
let data = b">s1\r\nACGT\r\n>s2\r\nTTTT\r\n";
let r = rope(data);
let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">s2\r\nTTTT\r\n");
}
#[test]
fn boundary_spans_two_blocks() {
let a = b">s1\nACGT\n";
let b = b">s2\nTTTT\n";
let r = rope2(a, b);
let all: Vec<u8> = flat(&r);
let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&all[pos..], b">s2\nTTTT\n");
}
+73
View File
@@ -0,0 +1,73 @@
use super::*;
fn rope(data: &[u8]) -> Rope {
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
fn make_fastq(records: &[(&[u8], &[u8])]) -> Vec<u8> {
let mut buf = Vec::new();
for (seq, qual) in records {
buf.extend_from_slice(b"@header\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(qual);
buf.push(b'\n');
}
buf
}
fn flat(r: &Rope) -> Vec<u8> {
r.fw_cursor().collect()
}
#[test]
fn single_record_no_boundary() {
let buf = make_fastq(&[(b"ACGT", b"IIII")]);
assert_eq!(end_of_last_fastq_entry(&rope(&buf)), None);
}
#[test]
fn two_records_cuts_at_second() {
let buf = make_fastq(&[(b"ACGT", b"IIII"), (b"TTTT", b"HHHH")]);
let r = rope(&buf);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(flat(&r)[pos], b'@');
assert_eq!(
&flat(&r)[pos..],
make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()
);
}
#[test]
fn three_records_cuts_at_last() {
let buf = make_fastq(&[(b"ACGT", b"IIII"), (b"CCCC", b"JJJJ"), (b"GGGG", b"KKKK")]);
let r = rope(&buf);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(
&flat(&r)[pos..],
make_fastq(&[(b"GGGG", b"KKKK")]).as_slice()
);
}
#[test]
fn at_sign_in_quality_does_not_confuse() {
let buf = make_fastq(&[(b"ACGTACGT", b"@@@@IIII"), (b"TTTT", b"HHHH")]);
let r = rope(&buf);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(
&flat(&r)[pos..],
make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()
);
}
#[test]
fn crlf_line_endings() {
let data = b"@h\r\nACGT\r\n+\r\nIIII\r\n@h\r\nTTTT\r\n+\r\nHHHH\r\n";
let r = rope(data);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(flat(&r)[pos], b'@');
assert_eq!(&flat(&r)[pos..], b"@h\r\nTTTT\r\n+\r\nHHHH\r\n");
}
+234
View File
@@ -0,0 +1,234 @@
use super::*;
fn make_rope(data: &[u8]) -> Rope {
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
fn flat(r: Rope) -> Vec<u8> {
r.fw_cursor().collect()
}
fn run_fastq(data: &[u8], k: usize) -> Vec<u8> {
flat(normalize_fastq_chunk(make_rope(data), k))
}
fn run_fasta(data: &[u8], k: usize) -> Vec<u8> {
flat(normalize_fasta_chunk(make_rope(data), k))
}
fn make_fastq(records: &[&[u8]]) -> Vec<u8> {
let mut buf = Vec::new();
for seq in records {
buf.extend_from_slice(b"@hdr\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(&vec![b'I'; seq.len()]);
buf.push(b'\n');
}
buf
}
fn make_fasta(records: &[(&[u8], &[u8])]) -> Vec<u8> {
let mut buf = Vec::new();
for (id, seq) in records {
buf.push(b'>');
buf.extend_from_slice(id);
buf.push(b'\n');
buf.extend_from_slice(seq);
buf.push(b'\n');
}
buf
}
// ── FASTQ basic ──────────────────────────────────────────────────────────
#[test]
fn single_record_produces_seq_then_null() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGTACGT"]), 4), b"ACGTACGT\x00");
}
#[test]
fn two_records_concatenated() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]), 4),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
#[test]
fn lowercase_input_uppercased() {
assert_eq!(run_fastq(&make_fastq(&[b"acgtacgt"]), 4), b"ACGTACGT\x00");
}
#[test]
fn mixed_case_uppercased() {
assert_eq!(run_fastq(&make_fastq(&[b"AcGtAcGt"]), 4), b"ACGTACGT\x00");
}
#[test]
fn sequence_shorter_than_k_discarded() {
assert_eq!(run_fastq(&make_fastq(&[b"ACG"]), 4), b"");
}
#[test]
fn sequence_exactly_k_kept() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGT"]), 4), b"ACGT\x00");
}
#[test]
fn short_record_among_valid_ones_discarded() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTACGT", b"AC", b"TTTTTTTT"]), 4),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
#[test]
fn ambiguous_splits_into_two_segments() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTNACGT"]), 4),
b"ACGT\x00ACGT\x00"
);
}
#[test]
fn segment_after_ambiguous_too_short_discarded() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTACGTNAC"]), 4),
b"ACGTACGT\x00"
);
}
#[test]
fn consecutive_ambiguous_produce_no_empty_segment() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTNNNNACGT"]), 4),
b"ACGT\x00ACGT\x00"
);
}
#[test]
fn ambiguous_at_start_skipped() {
assert_eq!(run_fastq(&make_fastq(&[b"NNACGTACGT"]), 4), b"ACGTACGT\x00");
}
#[test]
fn ambiguous_at_end_produces_no_trailing_empty() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGTACGTNN"]), 4), b"ACGTACGT\x00");
}
#[test]
fn crlf_handled() {
let data = b"@hdr\r\nACGTACGT\r\n+\r\nIIIIIIII\r\n";
assert_eq!(run_fastq(data, 4), b"ACGTACGT\x00");
}
#[test]
fn multi_slice_rope() {
let data = make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]);
let mid = data.len() / 2;
let mut rope = Rope::new(None);
rope.push(data[..mid].to_vec());
rope.push(data[mid..].to_vec());
assert_eq!(
flat(normalize_fastq_chunk(rope, 4)),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
// ── FASTA ─────────────────────────────────────────────────────────────────
#[test]
fn fasta_single_record() {
assert_eq!(
run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT")]), 4),
b"ACGTACGT\x00"
);
}
#[test]
fn fasta_two_records() {
assert_eq!(
run_fasta(
&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]),
4
),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
#[test]
fn fasta_multiline_sequence_concatenated() {
assert_eq!(
run_fasta(b">s1\nACGT\nACGT\nACGT\n", 4),
b"ACGTACGTACGT\x00"
);
}
#[test]
fn fasta_lowercase_uppercased() {
assert_eq!(
run_fasta(&make_fasta(&[(b"s1", b"acgtacgt")]), 4),
b"ACGTACGT\x00"
);
}
#[test]
fn fasta_short_record_discarded() {
assert_eq!(run_fasta(&make_fasta(&[(b"s1", b"ACG")]), 4), b"");
}
#[test]
fn fasta_short_among_valid_discarded() {
assert_eq!(
run_fasta(
&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"AC"), (b"s3", b"TTTTTTTT")]),
4
),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
#[test]
fn fasta_ambiguous_splits_segments() {
assert_eq!(run_fasta(b">s1\nACGTNACGT\n", 4), b"ACGT\x00ACGT\x00");
}
#[test]
fn fasta_ambiguous_across_line_boundary() {
assert_eq!(run_fasta(b">s1\nACGT\nNACGT\n", 4), b"ACGT\x00ACGT\x00");
}
#[test]
fn fasta_ambiguous_short_segment_discarded() {
assert_eq!(run_fasta(b">s1\nACGTACGTNAC\n", 4), b"ACGTACGT\x00");
}
#[test]
fn fasta_no_trailing_newline() {
assert_eq!(run_fasta(b">s1\nACGTACGT", 4), b"ACGTACGT\x00");
}
#[test]
fn fasta_crlf_line_endings() {
assert_eq!(
run_fasta(b">s1\r\nACGT\r\nACGT\r\n>s2\r\nTTTT\r\n", 4),
b"ACGTACGT\x00TTTT\x00"
);
}
#[test]
fn fasta_multi_slice_rope() {
let data = make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]);
let mid = data.len() / 2;
let mut rope = Rope::new(None);
rope.push(data[..mid].to_vec());
rope.push(data[mid..].to_vec());
assert_eq!(
flat(normalize_fasta_chunk(rope, 4)),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
+267
View File
@@ -0,0 +1,267 @@
use super::*;
use std::io::Cursor;
use std::ops::Deref;
// ── helpers ───────────────────────────────────────────────────────────────
fn run_fasta(data: &[u8], k: usize) -> Vec<u8> {
NucStream::<_, FastaParser>::new(Cursor::new(data.to_vec()), k)
.flat_map(|p| p.deref().to_vec())
.collect()
}
fn run_fastq(data: &[u8], k: usize) -> Vec<u8> {
NucStream::<_, FastqParser>::new(Cursor::new(data.to_vec()), k)
.flat_map(|p| p.deref().to_vec())
.collect()
}
fn run_genbank(data: &[u8], k: usize) -> Vec<u8> {
NucStream::<_, GenbankParser>::new(Cursor::new(data.to_vec()), k)
.flat_map(|p| p.deref().to_vec())
.collect()
}
fn pages_fasta(data: &[u8], k: usize) -> Vec<Vec<u8>> {
NucStream::<_, FastaParser>::new(Cursor::new(data.to_vec()), k)
.map(|p| p.deref().to_vec())
.collect()
}
// ── FastaParser ───────────────────────────────────────────────────────────
#[test]
fn fasta_single_sequence() {
assert_eq!(run_fasta(b">s1\nACGTACGT\n", 4), b"ACGTACGT\x00");
}
#[test]
fn fasta_lowercase_uppercased() {
assert_eq!(run_fasta(b">s1\nacgtacgt\n", 4), b"ACGTACGT\x00");
}
#[test]
fn fasta_multiline_sequence_concatenated() {
assert_eq!(run_fasta(b">s1\nACGT\nACGT\n", 4), b"ACGTACGT\x00");
}
#[test]
fn fasta_two_sequences() {
let data = b">s1\nACGTACGT\n>s2\nTTTTTTTT\n";
assert_eq!(run_fasta(data, 4), b"ACGTACGT\x00TTTTTTTT\x00");
}
#[test]
fn fasta_empty_input_yields_no_pages() {
assert_eq!(run_fasta(b"", 4), b"");
}
#[test]
fn fasta_sequence_shorter_than_k_at_eof_discarded() {
// The 3-base fragment is saved as overlap and dropped at EOF (< k).
assert_eq!(run_fasta(b">s1\nACG\n", 4), b"");
}
#[test]
fn fasta_ambiguous_splits_into_two_segments() {
assert_eq!(run_fasta(b">s1\nACGTNACGT\n", 4), b"ACGT\x00ACGT\x00");
}
#[test]
fn fasta_short_segment_before_ambiguous_emitted() {
// "AC" (< k=4) before N is written with a separator — filtering by
// length is deferred to the superkmer builder, not done here.
assert_eq!(run_fasta(b">s1\nACNACGTACGT\n", 4), b"AC\x00ACGTACGT\x00");
}
#[test]
fn fasta_ambiguous_at_start_skipped() {
assert_eq!(run_fasta(b">s1\nNNNACGTACGT\n", 4), b"ACGTACGT\x00");
}
// ── FastqParser ───────────────────────────────────────────────────────────
#[test]
fn fastq_single_record() {
assert_eq!(
run_fastq(b"@r1\nACGTACGT\n+\nIIIIIIII\n", 4),
b"ACGTACGT\x00"
);
}
#[test]
fn fastq_lowercase_uppercased() {
assert_eq!(
run_fastq(b"@r1\nacgtacgt\n+\nIIIIIIII\n", 4),
b"ACGTACGT\x00"
);
}
#[test]
fn fastq_quality_bytes_not_in_output() {
// '@' (Phred 31 = ASCII 64) in quality must not appear in output.
assert_eq!(
run_fastq(b"@r1\nACGTACGT\n+\n@@@@@@@@\n", 4),
b"ACGTACGT\x00"
);
}
#[test]
fn fastq_two_records() {
let data = b"@r1\nACGTACGT\n+\nIIIIIIII\n@r2\nTTTTTTTT\n+\nIIIIIIII\n";
assert_eq!(run_fastq(data, 4), b"ACGTACGT\x00TTTTTTTT\x00");
}
#[test]
fn fastq_ambiguous_splits_sequence() {
assert_eq!(
run_fastq(b"@r1\nACGTNACGT\n+\nIIIIIIIII\n", 4),
b"ACGT\x00ACGT\x00"
);
}
#[test]
fn fastq_at_in_quality_line_not_a_record_start() {
// '@' in the quality line must not trigger a new record parse.
let data = b"@r1\nACGTACGT\n+\n@@@@@@@@\n@r2\nTTTTTTTT\n+\nIIIIIIII\n";
assert_eq!(run_fastq(data, 4), b"ACGTACGT\x00TTTTTTTT\x00");
}
// ── GenbankParser ─────────────────────────────────────────────────────────
#[test]
fn genbank_origin_to_slash() {
let data = b"LOCUS ...\nORIGIN\n 1 acgtacgt\n//\n";
assert_eq!(run_genbank(data, 4), b"ACGTACGT\x00");
}
#[test]
fn genbank_position_numbers_and_spaces_skipped() {
let data = b"ORIGIN\n 1 acgt acgt\n//\n";
assert_eq!(run_genbank(data, 4), b"ACGTACGT\x00");
}
#[test]
fn genbank_two_records() {
let data = b"ORIGIN\n 1 acgtacgt\n//\nLOCUS ...\nORIGIN\n 1 tttttttt\n//\n";
assert_eq!(run_genbank(data, 4), b"ACGTACGT\x00TTTTTTTT\x00");
}
#[test]
fn genbank_ambiguous_splits_sequence() {
let data = b"ORIGIN\n 1 acgtnacgt\n//\n";
assert_eq!(run_genbank(data, 4), b"ACGT\x00ACGT\x00");
}
// ── NucPage ───────────────────────────────────────────────────────────────
#[test]
fn nuc_page_deref_correct_bytes() {
let page = NucStream::<_, FastaParser>::new(Cursor::new(b">s1\nACGT\n".to_vec()), 4)
.next()
.expect("page");
assert_eq!(page.deref(), b"ACGT\x00");
}
// ── NucPageCursor ─────────────────────────────────────────────────────────
fn make_page(data: &[u8], k: usize) -> NucPage {
NucStream::<_, FastaParser>::new(Cursor::new(data.to_vec()), k)
.next()
.expect("at least one page")
}
#[test]
fn cursor_reads_bytes_in_order() {
let page = make_page(b">s1\nACGTACGT\n", 4);
let mut cur = page.cursor();
assert_eq!(cur.next_byte(), Some(b'A'));
assert_eq!(cur.next_byte(), Some(b'C'));
assert_eq!(cur.next_byte(), Some(b'G'));
assert_eq!(cur.next_byte(), Some(b'T'));
}
#[test]
fn cursor_rewind_rereads_bytes() {
let page = make_page(b">s1\nACGTACGT\n", 4);
let mut cur = page.cursor();
cur.next_byte(); // A
cur.next_byte(); // C
cur.rewind(1);
assert_eq!(cur.next_byte(), Some(b'C'));
cur.rewind(2);
assert_eq!(cur.next_byte(), Some(b'A'));
}
#[test]
fn cursor_returns_none_at_end() {
// "ACGT\x00" = 5 bytes; consume all then expect None.
let page = make_page(b">s1\nACGT\n", 4);
let mut cur = page.cursor();
for _ in 0..5 {
cur.next_byte();
}
assert_eq!(cur.next_byte(), None);
}
#[test]
fn cursor_len_matches_page_content() {
// "ACGTACGT\x00" = 9 bytes
let page = make_page(b">s1\nACGTACGT\n", 4);
let cur = page.cursor();
assert_eq!(cur.len(), 9);
assert!(!cur.is_empty());
}
// ── Overlap at page boundary ──────────────────────────────────────────────
#[test]
fn overlap_last_km1_bytes_prepended_to_next_page() {
const K: usize = 11;
// Sequence long enough to span two pages: PAGE_SIZE + K bytes.
// Pattern chosen so boundary bytes are unambiguous.
let seq: Vec<u8> = (0..PAGE_SIZE + K).map(|i| b"ACGT"[i % 4]).collect();
let mut input = b">seq\n".to_vec();
input.extend_from_slice(&seq);
input.push(b'\n');
let pages = pages_fasta(&input, K);
assert!(pages.len() >= 2, "need at least two pages");
let p1 = &pages[0];
let p2 = &pages[1];
// page1 must end with a \x00 separator (written by save_overlap)
assert_eq!(*p1.last().unwrap(), 0x00, "page1 must end with separator");
// last K-1 ACGT bytes of page1 == first K-1 bytes of page2
let ol = K - 1;
let p1_seq_end = &p1[p1.len() - 1 - ol..p1.len() - 1];
let p2_start = &p2[..ol];
assert_eq!(
p1_seq_end, p2_start,
"overlap bytes mismatch at page boundary"
);
}
// ── Pool ──────────────────────────────────────────────────────────────────
#[test]
fn pool_buffer_reused_after_drop() {
// Drop page1 so its buffer returns to the pool, then verify page2
// is produced correctly (no corruption, no panic).
const K: usize = 11;
let seq: Vec<u8> = vec![b'A'; PAGE_SIZE + K];
let mut input = b">seq\n".to_vec();
input.extend_from_slice(&seq);
input.push(b'\n');
let mut stream = NucStream::<_, FastaParser>::new(Cursor::new(input), K);
let page1 = stream.next().expect("page 1");
assert!(!page1.deref().is_empty());
drop(page1); // returns buffer to pool
let page2 = stream.next().expect("page 2");
assert!(!page2.deref().is_empty());
// page2 must still start with A's (overlap from page1)
assert_eq!(page2[0], b'A');
}
+12 -5
View File
@@ -20,13 +20,13 @@ use std::io::{self, Read};
/// Open any source for reading, with transparent decompression. /// Open any source for reading, with transparent decompression.
/// ///
/// Returns a `Box<dyn Read>` that yields uncompressed bytes regardless of /// Returns a `Box<dyn Read + Send>` that yields uncompressed bytes regardless
/// whether the underlying source is plain text, gzip, bzip2, xz or zstd. /// of whether the underlying source is plain text, gzip, bzip2, xz or zstd.
/// ///
/// # Errors /// # Errors
/// Returns an `io::Error` if the file cannot be opened, the URL cannot be /// Returns an `io::Error` if the file cannot be opened, the URL cannot be
/// fetched, or the compression header is malformed. /// fetched, or the compression header is malformed.
pub fn xopen(source: &str) -> io::Result<MimeTypeGuesser<Box<dyn Read + Send>>> { pub(crate) fn open_raw(source: &str) -> io::Result<Box<dyn Read + Send>> {
let raw: Box<dyn Read + Send> = match source { let raw: Box<dyn Read + Send> = match source {
"-" => Box::new(io::stdin()), "-" => Box::new(io::stdin()),
s if s.starts_with("http://") || s.starts_with("https://") => http_reader(s)?, s if s.starts_with("http://") || s.starts_with("https://") => http_reader(s)?,
@@ -35,8 +35,15 @@ pub fn xopen(source: &str) -> io::Result<MimeTypeGuesser<Box<dyn Read + Send>>>
Box::new(File::open(expanded.as_ref())?) Box::new(File::open(expanded.as_ref())?)
} }
}; };
let decompressed = decompress(raw)?; decompress(raw)
Ok(MimeTypeGuesser::new(decompressed)) }
/// Open any source for reading, with transparent decompression and MIME detection.
///
/// Wraps [`open_raw`] in a [`MimeTypeGuesser`] so callers can inspect the
/// format before consuming the stream.
pub fn xopen(source: &str) -> io::Result<MimeTypeGuesser<Box<dyn Read + Send>>> {
Ok(MimeTypeGuesser::new(open_raw(source)?))
} }
// ── internal helpers ────────────────────────────────────────────────────────── // ── internal helpers ──────────────────────────────────────────────────────────
+1
View File
@@ -6,6 +6,7 @@ edition = "2024"
[dependencies] [dependencies]
obikseq = { path = "../obikseq" } obikseq = { path = "../obikseq" }
obikrope = { path = "../obikrope" } obikrope = { path = "../obikrope" }
obiread = { path = "../obiread" }
lazy_static = "1.5.0" lazy_static = "1.5.0"
[dev-dependencies] [dev-dependencies]
+14
View File
@@ -6,6 +6,7 @@
#![deny(missing_docs)] #![deny(missing_docs)]
pub mod iter; pub mod iter;
pub mod stream_iter;
mod scratch; mod scratch;
pub(crate) mod encoding; pub(crate) mod encoding;
@@ -14,10 +15,23 @@ pub(crate) mod rolling_stat;
pub use iter::SuperKmerIter; pub use iter::SuperKmerIter;
pub use scratch::SuperKmerScratch; pub use scratch::SuperKmerScratch;
pub use stream_iter::SuperKmerStreamIter;
use obiread::NucPage;
use obikrope::Rope; use obikrope::Rope;
use obikseq::RoutableSuperKmer; use obikseq::RoutableSuperKmer;
/// Collect all super-kmers from a normalised [`NucPage`].
pub fn build_superkmers_page(
page: NucPage,
k: usize,
level_max: usize,
theta: f64,
) -> Vec<RoutableSuperKmer> {
let cursor = page.cursor();
SuperKmerStreamIter::new(cursor, k, level_max, theta).collect()
}
/// Collect all super-kmers from a normalised rope chunk. /// Collect all super-kmers from a normalised rope chunk.
pub fn build_superkmers( pub fn build_superkmers(
rope: Rope, rope: Rope,
+129
View File
@@ -0,0 +1,129 @@
//! Streaming superkmer iterator over a [`NucPageCursor`].
use obiread::NucPageCursor;
use obikseq::RoutableSuperKmer;
use obikseq::kmer::Minimizer;
use crate::rolling_stat::RollingStat;
use crate::scratch::SuperKmerScratch;
/// Streaming iterator over [`RoutableSuperKmer`] values from a [`NucPageCursor`].
///
/// Cut conditions (checked in order per nucleotide, once the k-mer window is full):
///
/// | Condition | cursor rewind |
/// |------------------------|---------------|
/// | entropy(kmer) ≤ θ | k1 |
/// | minimizer changed | k |
/// | super-kmer length = 256| k |
pub struct SuperKmerStreamIter<'a> {
cursor: NucPageCursor<'a>,
k: usize,
theta: f64,
scratch: SuperKmerScratch,
stat: RollingStat,
prev_min: Option<Minimizer>,
prev_min_pos: usize,
}
impl<'a> SuperKmerStreamIter<'a> {
/// Build an iterator from a [`NucPageCursor`] over normalised sequence data.
pub fn new(cursor: NucPageCursor<'a>, k: usize, level_max: usize, theta: f64) -> Self {
Self {
cursor,
k,
theta,
scratch: SuperKmerScratch::new(),
stat: RollingStat::new(level_max),
prev_min: None,
prev_min_pos: 0,
}
}
fn reset(&mut self) {
self.stat.reset();
self.scratch.reset();
self.prev_min = None;
self.prev_min_pos = 0;
}
fn try_emit(&mut self) -> Option<RoutableSuperKmer> {
if self.scratch.len() < self.k {
return None;
}
self.prev_min?;
Some(self.scratch.emit(self.prev_min_pos))
}
}
impl Iterator for SuperKmerStreamIter<'_> {
type Item = RoutableSuperKmer;
fn next(&mut self) -> Option<RoutableSuperKmer> {
loop {
let byte = match self.cursor.next_byte() {
None => {
return self.try_emit();
}
Some(0x00) => {
let result = self.try_emit();
self.reset();
if result.is_some() {
return result;
}
continue;
}
Some(b) => b,
};
self.stat.push(byte);
if !self.stat.ready() {
self.scratch.push(byte);
continue;
}
// ── 1. Entropy check ─────────────────────────────────────────────
if self.stat.normalized_entropy().unwrap_or(1.0) < self.theta {
let result = self.try_emit();
self.cursor.rewind(self.k - 1);
self.reset();
if result.is_some() {
return result;
}
continue;
}
let min = self.stat.canonical_minimizer().unwrap();
let min_pos = self.stat.minimizer_position().unwrap_or(0);
// ── 2. Minimizer change ───────────────────────────────────────────
if let Some(prev) = self.prev_min {
if min != prev {
let result = self.try_emit();
self.cursor.rewind(self.k);
self.reset();
if result.is_some() {
return result;
}
continue;
}
}
// ── 3. Super-kmer length cap ──────────────────────────────────────
if self.scratch.len() == 256 {
let result = self.try_emit();
self.cursor.rewind(self.k);
self.reset();
if result.is_some() {
return result;
}
continue;
}
self.prev_min = Some(min);
self.prev_min_pos = min_pos;
self.scratch.push(byte);
}
}
}