From 13599dd444e972cbb3b0f21d10b01f1bd9cbe976 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Thu, 21 May 2026 13:23:05 +0200 Subject: [PATCH] feat: Implement query subcommand for sequence-to-genome mapping This change introduces the `query` CLI command and its supporting infrastructure for sequence-to-genome mapping and k-mer matching. It adds a `QueryLayer` abstraction backed by MPHF and persistent matrices, exposes the index partition for direct querying, and implements `Hash`/`Eq` for `RoutableSuperKmer`. The command ingests sequence batches, deduplicates superkmers, routes them to index partitions for parallel exact or 1-mismatch matching, and outputs results as FASTA records annotated with JSON metadata. Includes `serde_json` dependency addition, module exports, and documentation updates. --- TODO.md | 20 +- docmd/architecture/query.md | 111 ++++++++++ src/Cargo.lock | 1 + src/obikindex/src/index.rs | 5 + src/obikmer/Cargo.toml | 1 + src/obikmer/src/cmd/mod.rs | 1 + src/obikmer/src/cmd/query.rs | 281 ++++++++++++++++++++++++ src/obikmer/src/main.rs | 3 + src/obikpartitionner/src/lib.rs | 1 + src/obikpartitionner/src/query_layer.rs | 120 ++++++++++ src/obikseq/src/routable.rs | 14 ++ src/obiread/src/lib.rs | 1 + src/obiread/src/record.rs | 222 +++++++++++++++++++ 13 files changed, 762 insertions(+), 19 deletions(-) create mode 100644 docmd/architecture/query.md create mode 100644 src/obikmer/src/cmd/query.rs create mode 100644 src/obikpartitionner/src/query_layer.rs create mode 100644 src/obiread/src/record.rs diff --git a/TODO.md b/TODO.md index e6ba5d0..a874150 100644 --- a/TODO.md +++ b/TODO.md @@ -1,26 +1,8 @@ -## Chose à vérifier suite à la commande index - -- il faudrait lister les fichier qui vont être indexés -- partition.meta ne devrait plus exister -- les spectrums globaux devrait etre identifier par génome - - regrouper dans un sous-dossier spectrums à la racine de l'index avec un nom basé sur le génome -- les spectrum patiels ont-ils vocation à être conserver ? -- l'étape de déreplication dure quasiment autant de temps que le comptage mais ne laisse aucune trace de progression à l'utilisateur - ## commandes à ajouter -- filter : produit un nouvel index filtré à partir d'un index existant en verifiant que les kmer présents dans le nouvel index respectent les critères de filtrage spécifiés - - quorum de presence en fraction-(min/max) du nombre de génomes, en nombre-(min/max) de génomes, si mode count la présence peut être défini par un seuil personnalisé minimum et maximum - - aggregate : aggrege toutes les colonnes d'une matrice d'index en une seule colonne. - query : scan un fichier de sequences et retourne pour chaque sequence quels kmer sont présents dans l'index et dans quel genomes - -- distance : calcule la matrice de distance entre les genomes - - proposer une option pour chaque distance à calculer - - un possibité de récuperer la matrice des kmer communs - - un possibité de calculer l'arbre nj - - les matrices sont sauvegardées en CSV - - les arbres NJ sont sauvegardés en Newick avec les longeurs de branche + --detail et --mismatch à implementer - status : affiche le statut de l'index diff --git a/docmd/architecture/query.md b/docmd/architecture/query.md new file mode 100644 index 0000000..6a11a11 --- /dev/null +++ b/docmd/architecture/query.md @@ -0,0 +1,111 @@ +# Query system + +## Goal + +Given a set of query sequences, determine for each sequence how many of its k-mers are found in the index and, for each indexed genome, how many k-mers match. The query system is the foundation for read classification and sequence-to-genome mapping. + +--- + +## Input + +- Query sequences in FASTA or FASTQ format (gzip supported, streaming stdin supported). +- Sequences shorter than k bases are silently skipped. +- Non-ACGT characters are handled by the superkmer decomposition layer: they act as hard breaks, producing shorter superkmers (identical to the behaviour at indexing time). + +--- + +## Algorithm + +The query follows the same superkmer-based partitioning strategy used at indexing time. + +``` +for each query sequence: + decompose into superkmers (non-ACGT breaks, same minimiser scheme as indexing) + for each superkmer: + route to partition p via minimiser hash + for each kmer in the superkmer: + lookup kmer in partition p (MPHF → evidence check → matrix row) + accumulate result into per-sequence accumulators + emit annotated sequence +``` + +Parallelism is **per sequence**: each worker thread handles all partitions of one sequence independently. This avoids cross-thread coordination when merging partial results and keeps memory usage proportional to the number of concurrent sequences rather than to the number of partitions. + +--- + +## Exact vs. approximate matching + +### Exact (default) + +Standard MPHF lookup followed by evidence check. O(1) per k-mer. + +### 1-mismatch (`--mismatch` flag) + +For each k-mer of the query, generate all `3·k` single-substitution variants. Each variant is canonicalised and looked up independently in the index. If one or more variants are found, their per-genome rows are **summed** into the result for that k-mer position. + +- If a k-mer matches exactly AND one of its variants also matches (distinct k-mers in the index), both contributions are accumulated. +- Exact and approximate matches are tracked separately in the output (see annotation schema below). +- The superkmer routing optimisation is **not** used in 1-mismatch mode: each variant is looked up directly via its own minimiser. +- Cost: up to `3·k` MPHF probes per k-mer position vs. 1 in exact mode. + +--- + +## Output format + +Output sequences are written in **OBITools4 format**: the original sequence with a JSON annotation map in the title line. + +``` +>read_id {"kmer_total":150,"kmer_found":59,...} +ATCGATCG... +``` + +Genome order in all list-valued annotations follows the genome order recorded in `index.meta`. + +--- + +## Annotation schema + +### Summary mode (default) + +| Key | Type | Condition | Semantics | +|---|---|---|---| +| `kmer_total` | int | always | total k-mers in the (masked) sequence | +| `kmer_found` | int | always | k-mers with at least one match (exact or approx) | +| `kmer_missing` | int | `--count-missing` | k-mers absent from the index | +| `kmer_match` | list[int] | always | per-genome matched k-mer count (exact + approx) | +| `kmer_match_exact` | list[int] | `--mismatch` | per-genome exact match count | +| `kmer_match_approx` | list[int] | `--mismatch` | per-genome approx match count | +| `count_match` | list[int] | count index | per-genome sum of index counts for matched k-mers | + +`kmer_match[i]` is the number of k-mer positions in the query that contribute at least one match to genome i. In 1-mismatch mode, a single k-mer position can contribute to multiple genomes if several of its variants are present in the index. + +`count_match[i]` sums raw index counts across all matched k-mer positions for genome i. Only meaningful for count indexes. + +### Detail mode (`--detail`) + +All summary keys, plus per-position coverage vectors — one list per genome, length `len(sequence) − k + 1`: + +| Key | Type | Condition | Semantics | +|---|---|---|---| +| `cov_` | list[int] | `--detail` | coverage at each k-mer position for genome i; raw count (count index) or 0/1 (presence index); 0 if absent | +| `cov_exact_` | list[int] | `--detail` + `--mismatch` | exact-match contribution per position | +| `cov_approx_` | list[int] | `--detail` + `--mismatch` | approx-match contribution per position | + +Genome indices in key names are 0-based integers matching the `index.meta` genome order. Genome labels are not used as key names to avoid issues with special characters in long or complex genome identifiers. + +--- + +## CLI + +``` +obikmer query -i [--summary | --detail] [--mismatch] [--count-missing] +``` + +`--summary` is the default; `--detail` implies `--summary` (all summary keys are always present). + +--- + +## Future work + +- **Read classification** (`--classify`): assign each read to the genome with the highest `kmer_match` score; emit as a single annotation key. +- **Whitelist / blacklist filtering**: accept or reject sequences based on whether their k-mer match score for a designated set of genomes exceeds a threshold. diff --git a/src/Cargo.lock b/src/Cargo.lock index 1de3042..a07c18c 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1514,6 +1514,7 @@ dependencies = [ "obisys", "pprof", "rayon", + "serde_json", "speedytree", "tracing", "tracing-subscriber", diff --git a/src/obikindex/src/index.rs b/src/obikindex/src/index.rs index 16c1154..07792a4 100644 --- a/src/obikindex/src/index.rs +++ b/src/obikindex/src/index.rs @@ -185,6 +185,11 @@ impl KmerIndex { Ok(()) } + /// Borrow the inner partition for direct superkmer-level queries. + pub fn partition(&self) -> &KmerPartition { + &self.partition + } + /// Path to the unitigs file for partition `part`, layer `layer`. pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf { self.partition.part_dir(part) diff --git a/src/obikmer/Cargo.toml b/src/obikmer/Cargo.toml index fffb28b..053bc97 100644 --- a/src/obikmer/Cargo.toml +++ b/src/obikmer/Cargo.toml @@ -19,6 +19,7 @@ obisys = { path = "../obisys" } obiskio = { path = "../obiskio" } obikindex = { path = "../obikindex" } clap = { version = "4", features = ["derive"] } +serde_json = "1" kodama = "0.2" speedytree = "0.1" rayon = "1" diff --git a/src/obikmer/src/cmd/mod.rs b/src/obikmer/src/cmd/mod.rs index ac4057e..b71a789 100644 --- a/src/obikmer/src/cmd/mod.rs +++ b/src/obikmer/src/cmd/mod.rs @@ -2,6 +2,7 @@ pub mod distance; pub mod dump; pub mod index; pub mod merge; +pub mod query; pub mod rebuild; pub mod superkmer; pub mod unitig; diff --git a/src/obikmer/src/cmd/query.rs b/src/obikmer/src/cmd/query.rs new file mode 100644 index 0000000..2067067 --- /dev/null +++ b/src/obikmer/src/cmd/query.rs @@ -0,0 +1,281 @@ +use std::collections::HashMap; +use std::io::{self, BufWriter, Write}; +use std::path::PathBuf; + +use clap::Args; +use obikindex::KmerIndex; +use obiread::record::{SeqRecord, parse_chunk}; +use obiread::chunk::read_sequence_chunks; +use obikseq::{RoutableSuperKmer, set_k, set_m}; +use obiskbuilder::SuperKmerIter; +use tracing::info; + +// ── CLI ─────────────────────────────────────────────────────────────────────── + +#[derive(Args)] +pub struct QueryArgs { + /// Index directory + pub index: PathBuf, + + /// Input sequences (FASTA/FASTQ, optionally gzip-compressed) + #[arg(num_args = 1..)] + pub inputs: Vec, + + /// Also report per-position coverage vectors (cov_ per genome) + #[arg(long)] + pub detail: bool, + + /// Enable 1-mismatch approximate matching + #[arg(long)] + pub mismatch: bool, + + /// Count k-mers absent from the index (adds kmer_missing annotation) + #[arg(long)] + pub count_missing: bool, + + /// Report per-genome presence (0/1) instead of raw counts + #[arg(long)] + pub force_presence: bool, + + /// Minimum accumulated match count to declare a genome present (implies --force-presence) + #[arg(long, default_value_t = 1)] + pub presence_threshold: u32, + + /// Number of worker threads + #[arg( + short = 'T', + long, + default_value_t = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + )] + pub threads: usize, +} + +// ── SKDesc — one occurrence of a superkmer in the batch ─────────────────────── + +/// Describes one occurrence of a superkmer in the query batch. +pub struct SKDesc { + /// Index of the source sequence within the batch. + pub seq_idx: u32, + /// Kmer offset of the first kmer of this superkmer within its sequence. + /// Computed as the cumulative number of kmers emitted before this superkmer + /// in the same sequence. Used for `--detail` coverage vectors. + pub kmer_offset: u32, +} + +// ── QueryBatch ──────────────────────────────────────────────────────────────── + +/// A batch of query sequences with their superkmers deduplicated. +/// +/// Each unique `RoutableSuperKmer` maps to all the (seq_idx, kmer_offset) +/// positions it occupies across the batch. The superkmer is queried once +/// per partition; the result is broadcast to every SKDesc entry. +pub struct QueryBatch { + /// Sequence ids in batch order. + pub ids: Vec, + /// Raw sequence bytes (for output), in batch order. + pub seqs: Vec>, + /// Per-sequence total kmer count (kmer_count + kmer_missing). + pub n_kmers: Vec, + /// Deduplicated superkmer map. + pub map: HashMap>, +} + +impl QueryBatch { + /// Build a batch from a vec of parsed sequence records. + pub fn from_records( + records: Vec, + k: usize, + level_max: usize, + theta: f64, + ) -> Self { + let mut ids = Vec::with_capacity(records.len()); + let mut seqs = Vec::with_capacity(records.len()); + let mut n_kmers = Vec::with_capacity(records.len()); + let mut map: HashMap> = HashMap::new(); + + for (seq_idx, record) in records.into_iter().enumerate() { + let mut kmer_offset = 0u32; + + for rsk in SuperKmerIter::new(&record.normalized, k, level_max, theta) { + let n = (rsk.seql() - k + 1) as u32; + map.entry(rsk) + .or_default() + .push(SKDesc { seq_idx: seq_idx as u32, kmer_offset }); + kmer_offset += n; + } + + ids.push(record.id); + seqs.push(record.sequence); + n_kmers.push(kmer_offset); + } + + Self { ids, seqs, n_kmers, map } + } + + /// Split the superkmer map by partition index. + /// + /// Returns a vec of length `n_partitions`; each slot holds the RSK refs + /// whose minimizer routes to that partition. + pub fn split_by_partition(&self, n_partitions: usize) -> Vec> { + let mask = (n_partitions as u64) - 1; + let mut by_part: Vec> = vec![Vec::new(); n_partitions]; + for rsk in self.map.keys() { + let part = (rsk.minimizer().seq_hash() & mask) as usize; + by_part[part].push(rsk); + } + by_part + } +} + +// ── Per-sequence accumulator ────────────────────────────────────────────────── + +struct SeqAcc { + kmer_count: u32, + kmer_missing: u32, + /// Per-genome accumulated count (count mode) or presence sum (presence mode). + genome_totals: Vec, +} + +impl SeqAcc { + fn new(n_genomes: usize) -> Self { + Self { + kmer_count: 0, + kmer_missing: 0, + genome_totals: vec![0u32; n_genomes], + } + } +} + +// ── Entry point ─────────────────────────────────────────────────────────────── + +pub fn run(args: QueryArgs) { + let idx = KmerIndex::open(&args.index).unwrap_or_else(|e| { + eprintln!("error opening index: {e}"); + std::process::exit(1); + }); + + set_k(idx.kmer_size()); + set_m(idx.minimizer_size()); + + let k = idx.kmer_size(); + let n_genomes = idx.meta().genomes.len(); + let n_partitions = idx.n_partitions(); + let with_counts = idx.meta().config.with_counts; + + info!( + "query: k={k}, {} genome(s), with_counts={with_counts}, mismatch={}, detail={}", + n_genomes, args.mismatch, args.detail + ); + + if args.mismatch { + eprintln!("warning: --mismatch not yet implemented, ignored"); + } + if args.detail { + eprintln!("warning: --detail not yet implemented, ignored"); + } + + let paths: Vec = args.inputs.iter().map(PathBuf::from).collect(); + let mut out = BufWriter::new(io::stdout()); + + for path in &paths { + let chunks = read_sequence_chunks(path.to_str().unwrap_or("")) + .unwrap_or_else(|e| { + eprintln!("error opening {}: {e}", path.display()); + std::process::exit(1); + }); + + for chunk_result in chunks { + let chunk = chunk_result.unwrap_or_else(|e| { + eprintln!("read error: {e}"); + std::process::exit(1); + }); + + let records = parse_chunk(&chunk, k); + if records.is_empty() { + continue; + } + + let batch = QueryBatch::from_records(records, k, 6, 0.7); + let n_seqs = batch.ids.len(); + + let mut accs: Vec = (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect(); + + let by_part = batch.split_by_partition(n_partitions); + + for (part_idx, part_sks) in by_part.iter().enumerate() { + if part_sks.is_empty() { + continue; + } + + let kmer_results = idx + .partition() + .query_partition(part_idx, part_sks, k, n_genomes, with_counts) + .unwrap_or_else(|e| { + eprintln!("query error on partition {part_idx}: {e}"); + std::process::exit(1); + }); + + let presence = args.force_presence || !with_counts; + let threshold = args.presence_threshold; + + for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) { + let descs = batch.map.get(*rsk).expect("rsk must be in map"); + for desc in descs { + let acc = &mut accs[desc.seq_idx as usize]; + for hit in sk_kmer_results.iter() { + match hit { + None => acc.kmer_missing += 1, + Some(row) => { + acc.kmer_count += 1; + for (g, &v) in row.iter().enumerate() { + acc.genome_totals[g] += if presence { + u32::from(v >= threshold) + } else { + v + }; + } + } + } + } + } + } + } + + emit_batch(&batch, &accs, idx.meta(), args.count_missing, &mut out); + } + } +} + +// ── Output ──────────────────────────────────────────────────────────────────── + +fn emit_batch( + batch: &QueryBatch, + accs: &[SeqAcc], + meta: &obikindex::meta::IndexMeta, + count_missing: bool, + out: &mut impl Write, +) { + for (seq_idx, (id, seq)) in batch.ids.iter().zip(batch.seqs.iter()).enumerate() { + let acc = &accs[seq_idx]; + let mut ann = serde_json::Map::new(); + + ann.insert("kmer_count".into(), acc.kmer_count.into()); + if count_missing { + ann.insert("kmer_missing".into(), acc.kmer_missing.into()); + } + let mut match_map = serde_json::Map::new(); + for (g, label) in meta.genomes.iter().enumerate() { + match_map.insert(label.clone(), acc.genome_totals[g].into()); + } + ann.insert("kmer_strict_matches".into(), match_map.into()); + + let ann_str = serde_json::to_string(&ann).unwrap_or_else(|_| "{}".to_string()); + + // OBITools4 FASTA format: >id {"key":value,...} + let _ = writeln!(out, ">{id} {ann_str}"); + let _ = out.write_all(seq); + let _ = out.write_all(b"\n"); + } +} diff --git a/src/obikmer/src/main.rs b/src/obikmer/src/main.rs index bae4cdf..1b8f833 100644 --- a/src/obikmer/src/main.rs +++ b/src/obikmer/src/main.rs @@ -22,6 +22,8 @@ enum Commands { Merge(cmd::merge::MergeArgs), /// Filter and compact an existing index into a new single-layer index Rebuild(cmd::rebuild::RebuildArgs), + /// Query an index with sequences and annotate matches + Query(cmd::query::QueryArgs), /// Dump all indexed kmers as CSV (kmer + per-genome counts or presence) Dump(cmd::dump::DumpArgs), /// Compute pairwise distance matrix between genomes; optionally build NJ/UPGMA trees @@ -54,6 +56,7 @@ fn main() { Commands::Merge(args) => cmd::merge::run(args), Commands::Dump(args) => cmd::dump::run(args), Commands::Rebuild(args) => cmd::rebuild::run(args), + Commands::Query(args) => cmd::query::run(args), Commands::Distance(args) => cmd::distance::run(args), Commands::Unitig(args) => cmd::unitig::run(args), } diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index 48d72c1..49a81fc 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -5,6 +5,7 @@ mod index_layer; mod kmer_sort; mod merge_layer; mod partition; +mod query_layer; mod rebuild_layer; pub use filter::KmerFilter; diff --git a/src/obikpartitionner/src/query_layer.rs b/src/obikpartitionner/src/query_layer.rs new file mode 100644 index 0000000..f18a0d5 --- /dev/null +++ b/src/obikpartitionner/src/query_layer.rs @@ -0,0 +1,120 @@ +use std::path::Path; + +use obicompactvec::{PersistentBitMatrix, PersistentCompactIntMatrix}; +use obikseq::{CanonicalKmer, RoutableSuperKmer}; +use obiskio::{SKError, SKResult}; +use obilayeredmap::{MphfLayer, OLMError}; +use obilayeredmap::meta::PartitionMeta; + +use crate::partition::KmerPartition; + +const INDEX_SUBDIR: &str = "index"; + +fn olm_to_sk(e: OLMError) -> SKError { + match e { + OLMError::Io(io_err) => SKError::Io(io_err), + other => SKError::InvalidData { context: "query", detail: other.to_string() }, + } +} + +// ── per-layer query handle ──────────────────────────────────────────────────── + +enum QueryLayer { + /// Layer<()> — MPHF-only, no data matrix; all indexed kmers map to 1 per genome. + SetOnly(MphfLayer), + Presence(MphfLayer, PersistentBitMatrix), + Count(MphfLayer, PersistentCompactIntMatrix), +} + +impl QueryLayer { + fn open(layer_dir: &Path, with_counts: bool) -> SKResult { + let presence_dir = layer_dir.join("presence"); + let counts_dir = layer_dir.join("counts"); + + if with_counts && counts_dir.exists() { + let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?; + let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; + Ok(QueryLayer::Count(mphf, mat)) + } else if presence_dir.exists() { + let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?; + let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; + Ok(QueryLayer::Presence(mphf, mat)) + } else if counts_dir.exists() { + // presence query on a count index — return counts as-is + let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?; + let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; + Ok(QueryLayer::Count(mphf, mat)) + } else { + let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?; + Ok(QueryLayer::SetOnly(mphf)) + } + } + + /// Return `Some(per-genome row)` if `kmer` is indexed in this layer, else `None`. + fn find(&self, kmer: CanonicalKmer, n_genomes: usize) -> Option> { + match self { + QueryLayer::SetOnly(mphf) => { + mphf.find(kmer) + .map(|_| vec![1u32; n_genomes].into_boxed_slice()) + } + QueryLayer::Presence(mphf, mat) => { + mphf.find(kmer) + .map(|slot| mat.row(slot).iter().map(|&b| b as u32).collect()) + } + QueryLayer::Count(mphf, mat) => { + mphf.find(kmer).map(|slot| mat.row(slot)) + } + } + } +} + +// ── KmerPartition::query_partition ─────────────────────────────────────────── + +impl KmerPartition { + /// Query a single partition for a slice of (already-routed) super-kmers. + /// + /// Returns one entry per input super-kmer; each entry is a `Vec` with one + /// `Option>` per k-mer inside that super-kmer: + /// - `None` — k-mer absent from the index + /// - `Some(row)` — per-genome count (count index) or 0/1 (presence index) + /// + /// All `superkmers` must belong to this partition (same minimizer bucket). + pub fn query_partition( + &self, + part_idx: usize, + superkmers: &[&RoutableSuperKmer], + k: usize, + n_genomes: usize, + with_counts: bool, + ) -> SKResult>>>> { + if superkmers.is_empty() { + return Ok(Vec::new()); + } + + let index_dir = self.part_dir(part_idx).join(INDEX_SUBDIR); + + if !index_dir.exists() { + return Ok(superkmers + .iter() + .map(|rsk| vec![None; rsk.seql() - k + 1]) + .collect()); + } + + let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?; + let layers: Vec = (0..meta.n_layers) + .map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts)) + .collect::>()?; + + Ok(superkmers + .iter() + .map(|rsk| { + rsk.superkmer() + .iter_canonical_kmers() + .map(|kmer| { + layers.iter().find_map(|layer| layer.find(kmer, n_genomes)) + }) + .collect() + }) + .collect()) + } +} diff --git a/src/obikseq/src/routable.rs b/src/obikseq/src/routable.rs index a4b2338..411fcb0 100644 --- a/src/obikseq/src/routable.rs +++ b/src/obikseq/src/routable.rs @@ -71,6 +71,20 @@ impl RoutableSuperKmer { } } +impl PartialEq for RoutableSuperKmer { + fn eq(&self, other: &Self) -> bool { + self.superkmer == other.superkmer + } +} + +impl Eq for RoutableSuperKmer {} + +impl std::hash::Hash for RoutableSuperKmer { + fn hash(&self, state: &mut H) { + self.superkmer.hash(state); + } +} + impl Sequence for RoutableSuperKmer { type Canonical = RoutableSuperKmer; diff --git a/src/obiread/src/lib.rs b/src/obiread/src/lib.rs index a07d250..771d839 100644 --- a/src/obiread/src/lib.rs +++ b/src/obiread/src/lib.rs @@ -12,6 +12,7 @@ mod mimetype; pub mod normalize; mod path_iterator; pub mod peakreader; +pub mod record; pub mod xopen; pub use chunk::{SeqChunkIter, fasta_chunks, fastq_chunks, diff --git a/src/obiread/src/record.rs b/src/obiread/src/record.rs new file mode 100644 index 0000000..fb5e141 --- /dev/null +++ b/src/obiread/src/record.rs @@ -0,0 +1,222 @@ +//! Per-sequence record parser for FASTA and FASTQ chunks. +//! +//! Same automaton structure as `normalize.rs` — only the actions differ: +//! instead of writing into a single flat rope, we accumulate per-sequence +//! data (id, raw ASCII, normalised ACGT\x00 rope). + +use obikrope::{ForwardCursor, Rope, RopeCursor}; + +/// One sequence record extracted from a FASTA or FASTQ chunk. +pub struct SeqRecord { + /// Sequence identifier (everything before the first space in the header). + pub id: String, + /// Raw sequence bytes, newlines stripped, non-ACGT characters preserved. + /// Reproduced verbatim in query output. + pub sequence: Vec, + /// Per-sequence normalised rope: uppercase ACGT segments of length ≥ k + /// separated by `\x00`. Ready for `SuperKmerIter`. + pub normalized: Rope, +} + +/// Parse all records from a FASTA or FASTQ chunk rope. +/// Returns an empty vec if the rope carries no recognised mime type. +pub fn parse_chunk(rope: &Rope, k: usize) -> Vec { + let cursor = rope.fw_cursor(); + match rope.mime_type() { + Some("text/fasta") => parse_fasta(cursor, k), + Some("text/fastq") => parse_fastq(cursor, k), + _ => vec![], + } +} + +// ── Shared state accumulated while scanning one sequence ────────────────────── + +struct RecordBuilder { + id: String, + sequence: Vec, // raw ASCII, no newlines + norm: Vec, // ACGT\x00 segments being built + seg_start: usize, // index in norm where current segment started + k: usize, +} + +impl RecordBuilder { + fn new(k: usize) -> Self { + Self { id: String::new(), sequence: Vec::new(), norm: Vec::new(), seg_start: 0, k } + } + + fn reset(&mut self, id: String) { + self.id = id; + self.sequence.clear(); + self.norm.clear(); + self.seg_start = 0; + } + + /// Push one accepted ACGT byte. + fn push_acgt(&mut self, b: u8) { + self.sequence.push(b); + self.norm.push(b); + } + + /// Push one non-ACGT byte to the raw sequence only (not to the norm buffer). + fn push_raw(&mut self, b: u8) { + self.sequence.push(b); + } + + /// Close the current ACGT segment (same logic as `end_segment` in normalize.rs). + fn end_segment(&mut self) { + if self.norm.len() - self.seg_start >= self.k { + self.norm.push(0x00); + self.seg_start = self.norm.len(); + } else { + self.norm.truncate(self.seg_start); + } + } + + /// Consume into a SeqRecord. Closes any open segment first. + fn finish(mut self) -> Option { + self.end_segment(); + if self.id.is_empty() { + return None; + } + let mut rope = Rope::new(None); + if !self.norm.is_empty() { + rope.push(self.norm); + } + Some(SeqRecord { id: self.id, sequence: self.sequence, normalized: rope }) + } +} + +// ── FASTA automaton ─────────────────────────────────────────────────────────── + +fn parse_fasta(cursor: ForwardCursor<'_>, k: usize) -> Vec { + let mut records: Vec = Vec::new(); + let mut builder = RecordBuilder::new(k); + + // skip up to (and including) the first '>' + loop { + match cursor.read_next().ok() { + None => return records, + Some(b'>') => break, + Some(_) => {} + } + } + + // read first id — read_id already consumes the full header line + builder.id = read_id(&cursor); + + loop { + match cursor.read_next().ok() { + None => { + // EOF — close final segment and emit + if let Some(rec) = builder.finish() { + records.push(rec); + } + return records; + } + Some(b'\n') | Some(b'\r') => { + // peek: next non-empty char determines if new record starts + match cursor.read_ahead(1).ok() { + Some(b'>') => { + // end of current record + builder.end_segment(); + if let Some(rec) = builder.finish() { + records.push(rec); + } + cursor.read_next().ok(); // consume '>' + let id = read_id(&cursor); // already consumes header line + builder = RecordBuilder::new(k); + builder.reset(id); + } + None => { + builder.end_segment(); + if let Some(rec) = builder.finish() { + records.push(rec); + } + return records; + } + Some(_) => {} // continuation line — do nothing + } + } + Some(b) => { + let upper = b & !0x20u8; + if matches!(upper, b'A' | b'C' | b'G' | b'T') { + builder.push_acgt(upper); + } else { + builder.push_raw(b); + builder.end_segment(); + } + } + } + } +} + +// ── FASTQ automaton ─────────────────────────────────────────────────────────── + +fn parse_fastq(cursor: ForwardCursor<'_>, k: usize) -> Vec { + let mut records: Vec = Vec::new(); + + loop { + // find '@' + loop { + match cursor.read_next().ok() { + None => return records, + Some(b'@') => break, + Some(_) => {} + } + } + + let mut builder = RecordBuilder::new(k); + builder.id = read_id(&cursor); // already consumes the full header line + + // sequence line — stop at newline, non-ACGT breaks segment + loop { + match cursor.read_next().ok() { + None | Some(b'\n') | Some(b'\r') => { + builder.end_segment(); + break; + } + Some(b) => { + let upper = b & !0x20u8; + if matches!(upper, b'A' | b'C' | b'G' | b'T') { + builder.push_acgt(upper); + } else { + builder.push_raw(b); + builder.end_segment(); + } + } + } + } + + skip_line(&cursor); // '+' line + skip_line(&cursor); // quality line + + if let Some(rec) = builder.finish() { + records.push(rec); + } + } +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn read_id(cursor: &ForwardCursor<'_>) -> String { + let mut id = Vec::new(); + loop { + match cursor.read_next().ok() { + None | Some(b'\n') | Some(b'\r') => break, + Some(b' ') | Some(b'\t') => { + skip_line(cursor); + break; + } + Some(b) => id.push(b), + } + } + String::from_utf8_lossy(&id).into_owned() +} + +fn skip_line(cursor: &ForwardCursor<'_>) { + while let Some(c) = cursor.read_next().ok() { + if c == b'\n' { + return; + } + } +}