Push qkpyqurltlpk #1
@@ -1,26 +1,8 @@
|
||||
## Chose à vérifier suite à la commande index
|
||||
|
||||
- il faudrait lister les fichier qui vont être indexés
|
||||
- partition.meta ne devrait plus exister
|
||||
- les spectrums globaux devrait etre identifier par génome
|
||||
- regrouper dans un sous-dossier spectrums à la racine de l'index avec un nom basé sur le génome
|
||||
- les spectrum patiels ont-ils vocation à être conserver ?
|
||||
- l'étape de déreplication dure quasiment autant de temps que le comptage mais ne laisse aucune trace de progression à l'utilisateur
|
||||
|
||||
## commandes à ajouter
|
||||
|
||||
- filter : produit un nouvel index filtré à partir d'un index existant en verifiant que les kmer présents dans le nouvel index respectent les critères de filtrage spécifiés
|
||||
- quorum de presence en fraction-(min/max) du nombre de génomes, en nombre-(min/max) de génomes, si mode count la présence peut être défini par un seuil personnalisé minimum et maximum
|
||||
|
||||
- aggregate : aggrege toutes les colonnes d'une matrice d'index en une seule colonne.
|
||||
|
||||
- query : scan un fichier de sequences et retourne pour chaque sequence quels kmer sont présents dans l'index et dans quel genomes
|
||||
|
||||
- distance : calcule la matrice de distance entre les genomes
|
||||
- proposer une option pour chaque distance à calculer
|
||||
- un possibité de récuperer la matrice des kmer communs
|
||||
- un possibité de calculer l'arbre nj
|
||||
- les matrices sont sauvegardées en CSV
|
||||
- les arbres NJ sont sauvegardés en Newick avec les longeurs de branche
|
||||
--detail et --mismatch à implementer
|
||||
|
||||
- status : affiche le statut de l'index
|
||||
|
||||
@@ -0,0 +1,111 @@
|
||||
# Query system
|
||||
|
||||
## Goal
|
||||
|
||||
Given a set of query sequences, determine for each sequence how many of its k-mers are found in the index and, for each indexed genome, how many k-mers match. The query system is the foundation for read classification and sequence-to-genome mapping.
|
||||
|
||||
---
|
||||
|
||||
## Input
|
||||
|
||||
- Query sequences in FASTA or FASTQ format (gzip supported, streaming stdin supported).
|
||||
- Sequences shorter than k bases are silently skipped.
|
||||
- Non-ACGT characters are handled by the superkmer decomposition layer: they act as hard breaks, producing shorter superkmers (identical to the behaviour at indexing time).
|
||||
|
||||
---
|
||||
|
||||
## Algorithm
|
||||
|
||||
The query follows the same superkmer-based partitioning strategy used at indexing time.
|
||||
|
||||
```
|
||||
for each query sequence:
|
||||
decompose into superkmers (non-ACGT breaks, same minimiser scheme as indexing)
|
||||
for each superkmer:
|
||||
route to partition p via minimiser hash
|
||||
for each kmer in the superkmer:
|
||||
lookup kmer in partition p (MPHF → evidence check → matrix row)
|
||||
accumulate result into per-sequence accumulators
|
||||
emit annotated sequence
|
||||
```
|
||||
|
||||
Parallelism is **per sequence**: each worker thread handles all partitions of one sequence independently. This avoids cross-thread coordination when merging partial results and keeps memory usage proportional to the number of concurrent sequences rather than to the number of partitions.
|
||||
|
||||
---
|
||||
|
||||
## Exact vs. approximate matching
|
||||
|
||||
### Exact (default)
|
||||
|
||||
Standard MPHF lookup followed by evidence check. O(1) per k-mer.
|
||||
|
||||
### 1-mismatch (`--mismatch` flag)
|
||||
|
||||
For each k-mer of the query, generate all `3·k` single-substitution variants. Each variant is canonicalised and looked up independently in the index. If one or more variants are found, their per-genome rows are **summed** into the result for that k-mer position.
|
||||
|
||||
- If a k-mer matches exactly AND one of its variants also matches (distinct k-mers in the index), both contributions are accumulated.
|
||||
- Exact and approximate matches are tracked separately in the output (see annotation schema below).
|
||||
- The superkmer routing optimisation is **not** used in 1-mismatch mode: each variant is looked up directly via its own minimiser.
|
||||
- Cost: up to `3·k` MPHF probes per k-mer position vs. 1 in exact mode.
|
||||
|
||||
---
|
||||
|
||||
## Output format
|
||||
|
||||
Output sequences are written in **OBITools4 format**: the original sequence with a JSON annotation map in the title line.
|
||||
|
||||
```
|
||||
>read_id {"kmer_total":150,"kmer_found":59,...}
|
||||
ATCGATCG...
|
||||
```
|
||||
|
||||
Genome order in all list-valued annotations follows the genome order recorded in `index.meta`.
|
||||
|
||||
---
|
||||
|
||||
## Annotation schema
|
||||
|
||||
### Summary mode (default)
|
||||
|
||||
| Key | Type | Condition | Semantics |
|
||||
|---|---|---|---|
|
||||
| `kmer_total` | int | always | total k-mers in the (masked) sequence |
|
||||
| `kmer_found` | int | always | k-mers with at least one match (exact or approx) |
|
||||
| `kmer_missing` | int | `--count-missing` | k-mers absent from the index |
|
||||
| `kmer_match` | list[int] | always | per-genome matched k-mer count (exact + approx) |
|
||||
| `kmer_match_exact` | list[int] | `--mismatch` | per-genome exact match count |
|
||||
| `kmer_match_approx` | list[int] | `--mismatch` | per-genome approx match count |
|
||||
| `count_match` | list[int] | count index | per-genome sum of index counts for matched k-mers |
|
||||
|
||||
`kmer_match[i]` is the number of k-mer positions in the query that contribute at least one match to genome i. In 1-mismatch mode, a single k-mer position can contribute to multiple genomes if several of its variants are present in the index.
|
||||
|
||||
`count_match[i]` sums raw index counts across all matched k-mer positions for genome i. Only meaningful for count indexes.
|
||||
|
||||
### Detail mode (`--detail`)
|
||||
|
||||
All summary keys, plus per-position coverage vectors — one list per genome, length `len(sequence) − k + 1`:
|
||||
|
||||
| Key | Type | Condition | Semantics |
|
||||
|---|---|---|---|
|
||||
| `cov_<i>` | list[int] | `--detail` | coverage at each k-mer position for genome i; raw count (count index) or 0/1 (presence index); 0 if absent |
|
||||
| `cov_exact_<i>` | list[int] | `--detail` + `--mismatch` | exact-match contribution per position |
|
||||
| `cov_approx_<i>` | list[int] | `--detail` + `--mismatch` | approx-match contribution per position |
|
||||
|
||||
Genome indices in key names are 0-based integers matching the `index.meta` genome order. Genome labels are not used as key names to avoid issues with special characters in long or complex genome identifiers.
|
||||
|
||||
---
|
||||
|
||||
## CLI
|
||||
|
||||
```
|
||||
obikmer query -i <index> [--summary | --detail] [--mismatch] [--count-missing] <query.fa>
|
||||
```
|
||||
|
||||
`--summary` is the default; `--detail` implies `--summary` (all summary keys are always present).
|
||||
|
||||
---
|
||||
|
||||
## Future work
|
||||
|
||||
- **Read classification** (`--classify`): assign each read to the genome with the highest `kmer_match` score; emit as a single annotation key.
|
||||
- **Whitelist / blacklist filtering**: accept or reject sequences based on whether their k-mer match score for a designated set of genomes exceeds a threshold.
|
||||
Generated
+1
@@ -1514,6 +1514,7 @@ dependencies = [
|
||||
"obisys",
|
||||
"pprof",
|
||||
"rayon",
|
||||
"serde_json",
|
||||
"speedytree",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
|
||||
@@ -185,6 +185,11 @@ impl KmerIndex {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Borrow the inner partition for direct superkmer-level queries.
|
||||
pub fn partition(&self) -> &KmerPartition {
|
||||
&self.partition
|
||||
}
|
||||
|
||||
/// Path to the unitigs file for partition `part`, layer `layer`.
|
||||
pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf {
|
||||
self.partition.part_dir(part)
|
||||
|
||||
@@ -19,6 +19,7 @@ obisys = { path = "../obisys" }
|
||||
obiskio = { path = "../obiskio" }
|
||||
obikindex = { path = "../obikindex" }
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
kodama = "0.2"
|
||||
speedytree = "0.1"
|
||||
rayon = "1"
|
||||
|
||||
@@ -2,6 +2,7 @@ pub mod distance;
|
||||
pub mod dump;
|
||||
pub mod index;
|
||||
pub mod merge;
|
||||
pub mod query;
|
||||
pub mod rebuild;
|
||||
pub mod superkmer;
|
||||
pub mod unitig;
|
||||
|
||||
@@ -0,0 +1,281 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::{self, BufWriter, Write};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use clap::Args;
|
||||
use obikindex::KmerIndex;
|
||||
use obiread::record::{SeqRecord, parse_chunk};
|
||||
use obiread::chunk::read_sequence_chunks;
|
||||
use obikseq::{RoutableSuperKmer, set_k, set_m};
|
||||
use obiskbuilder::SuperKmerIter;
|
||||
use tracing::info;
|
||||
|
||||
// ── CLI ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
#[derive(Args)]
|
||||
pub struct QueryArgs {
|
||||
/// Index directory
|
||||
pub index: PathBuf,
|
||||
|
||||
/// Input sequences (FASTA/FASTQ, optionally gzip-compressed)
|
||||
#[arg(num_args = 1..)]
|
||||
pub inputs: Vec<String>,
|
||||
|
||||
/// Also report per-position coverage vectors (cov_<i> per genome)
|
||||
#[arg(long)]
|
||||
pub detail: bool,
|
||||
|
||||
/// Enable 1-mismatch approximate matching
|
||||
#[arg(long)]
|
||||
pub mismatch: bool,
|
||||
|
||||
/// Count k-mers absent from the index (adds kmer_missing annotation)
|
||||
#[arg(long)]
|
||||
pub count_missing: bool,
|
||||
|
||||
/// Report per-genome presence (0/1) instead of raw counts
|
||||
#[arg(long)]
|
||||
pub force_presence: bool,
|
||||
|
||||
/// Minimum accumulated match count to declare a genome present (implies --force-presence)
|
||||
#[arg(long, default_value_t = 1)]
|
||||
pub presence_threshold: u32,
|
||||
|
||||
/// Number of worker threads
|
||||
#[arg(
|
||||
short = 'T',
|
||||
long,
|
||||
default_value_t = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1)
|
||||
)]
|
||||
pub threads: usize,
|
||||
}
|
||||
|
||||
// ── SKDesc — one occurrence of a superkmer in the batch ───────────────────────
|
||||
|
||||
/// Describes one occurrence of a superkmer in the query batch.
|
||||
pub struct SKDesc {
|
||||
/// Index of the source sequence within the batch.
|
||||
pub seq_idx: u32,
|
||||
/// Kmer offset of the first kmer of this superkmer within its sequence.
|
||||
/// Computed as the cumulative number of kmers emitted before this superkmer
|
||||
/// in the same sequence. Used for `--detail` coverage vectors.
|
||||
pub kmer_offset: u32,
|
||||
}
|
||||
|
||||
// ── QueryBatch ────────────────────────────────────────────────────────────────
|
||||
|
||||
/// A batch of query sequences with their superkmers deduplicated.
|
||||
///
|
||||
/// Each unique `RoutableSuperKmer` maps to all the (seq_idx, kmer_offset)
|
||||
/// positions it occupies across the batch. The superkmer is queried once
|
||||
/// per partition; the result is broadcast to every SKDesc entry.
|
||||
pub struct QueryBatch {
|
||||
/// Sequence ids in batch order.
|
||||
pub ids: Vec<String>,
|
||||
/// Raw sequence bytes (for output), in batch order.
|
||||
pub seqs: Vec<Vec<u8>>,
|
||||
/// Per-sequence total kmer count (kmer_count + kmer_missing).
|
||||
pub n_kmers: Vec<u32>,
|
||||
/// Deduplicated superkmer map.
|
||||
pub map: HashMap<RoutableSuperKmer, Vec<SKDesc>>,
|
||||
}
|
||||
|
||||
impl QueryBatch {
|
||||
/// Build a batch from a vec of parsed sequence records.
|
||||
pub fn from_records(
|
||||
records: Vec<SeqRecord>,
|
||||
k: usize,
|
||||
level_max: usize,
|
||||
theta: f64,
|
||||
) -> Self {
|
||||
let mut ids = Vec::with_capacity(records.len());
|
||||
let mut seqs = Vec::with_capacity(records.len());
|
||||
let mut n_kmers = Vec::with_capacity(records.len());
|
||||
let mut map: HashMap<RoutableSuperKmer, Vec<SKDesc>> = HashMap::new();
|
||||
|
||||
for (seq_idx, record) in records.into_iter().enumerate() {
|
||||
let mut kmer_offset = 0u32;
|
||||
|
||||
for rsk in SuperKmerIter::new(&record.normalized, k, level_max, theta) {
|
||||
let n = (rsk.seql() - k + 1) as u32;
|
||||
map.entry(rsk)
|
||||
.or_default()
|
||||
.push(SKDesc { seq_idx: seq_idx as u32, kmer_offset });
|
||||
kmer_offset += n;
|
||||
}
|
||||
|
||||
ids.push(record.id);
|
||||
seqs.push(record.sequence);
|
||||
n_kmers.push(kmer_offset);
|
||||
}
|
||||
|
||||
Self { ids, seqs, n_kmers, map }
|
||||
}
|
||||
|
||||
/// Split the superkmer map by partition index.
|
||||
///
|
||||
/// Returns a vec of length `n_partitions`; each slot holds the RSK refs
|
||||
/// whose minimizer routes to that partition.
|
||||
pub fn split_by_partition(&self, n_partitions: usize) -> Vec<Vec<&RoutableSuperKmer>> {
|
||||
let mask = (n_partitions as u64) - 1;
|
||||
let mut by_part: Vec<Vec<&RoutableSuperKmer>> = vec![Vec::new(); n_partitions];
|
||||
for rsk in self.map.keys() {
|
||||
let part = (rsk.minimizer().seq_hash() & mask) as usize;
|
||||
by_part[part].push(rsk);
|
||||
}
|
||||
by_part
|
||||
}
|
||||
}
|
||||
|
||||
// ── Per-sequence accumulator ──────────────────────────────────────────────────
|
||||
|
||||
struct SeqAcc {
|
||||
kmer_count: u32,
|
||||
kmer_missing: u32,
|
||||
/// Per-genome accumulated count (count mode) or presence sum (presence mode).
|
||||
genome_totals: Vec<u32>,
|
||||
}
|
||||
|
||||
impl SeqAcc {
|
||||
fn new(n_genomes: usize) -> Self {
|
||||
Self {
|
||||
kmer_count: 0,
|
||||
kmer_missing: 0,
|
||||
genome_totals: vec![0u32; n_genomes],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
pub fn run(args: QueryArgs) {
|
||||
let idx = KmerIndex::open(&args.index).unwrap_or_else(|e| {
|
||||
eprintln!("error opening index: {e}");
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
set_k(idx.kmer_size());
|
||||
set_m(idx.minimizer_size());
|
||||
|
||||
let k = idx.kmer_size();
|
||||
let n_genomes = idx.meta().genomes.len();
|
||||
let n_partitions = idx.n_partitions();
|
||||
let with_counts = idx.meta().config.with_counts;
|
||||
|
||||
info!(
|
||||
"query: k={k}, {} genome(s), with_counts={with_counts}, mismatch={}, detail={}",
|
||||
n_genomes, args.mismatch, args.detail
|
||||
);
|
||||
|
||||
if args.mismatch {
|
||||
eprintln!("warning: --mismatch not yet implemented, ignored");
|
||||
}
|
||||
if args.detail {
|
||||
eprintln!("warning: --detail not yet implemented, ignored");
|
||||
}
|
||||
|
||||
let paths: Vec<PathBuf> = args.inputs.iter().map(PathBuf::from).collect();
|
||||
let mut out = BufWriter::new(io::stdout());
|
||||
|
||||
for path in &paths {
|
||||
let chunks = read_sequence_chunks(path.to_str().unwrap_or(""))
|
||||
.unwrap_or_else(|e| {
|
||||
eprintln!("error opening {}: {e}", path.display());
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
for chunk_result in chunks {
|
||||
let chunk = chunk_result.unwrap_or_else(|e| {
|
||||
eprintln!("read error: {e}");
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
let records = parse_chunk(&chunk, k);
|
||||
if records.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let batch = QueryBatch::from_records(records, k, 6, 0.7);
|
||||
let n_seqs = batch.ids.len();
|
||||
|
||||
let mut accs: Vec<SeqAcc> = (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect();
|
||||
|
||||
let by_part = batch.split_by_partition(n_partitions);
|
||||
|
||||
for (part_idx, part_sks) in by_part.iter().enumerate() {
|
||||
if part_sks.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let kmer_results = idx
|
||||
.partition()
|
||||
.query_partition(part_idx, part_sks, k, n_genomes, with_counts)
|
||||
.unwrap_or_else(|e| {
|
||||
eprintln!("query error on partition {part_idx}: {e}");
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
let presence = args.force_presence || !with_counts;
|
||||
let threshold = args.presence_threshold;
|
||||
|
||||
for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) {
|
||||
let descs = batch.map.get(*rsk).expect("rsk must be in map");
|
||||
for desc in descs {
|
||||
let acc = &mut accs[desc.seq_idx as usize];
|
||||
for hit in sk_kmer_results.iter() {
|
||||
match hit {
|
||||
None => acc.kmer_missing += 1,
|
||||
Some(row) => {
|
||||
acc.kmer_count += 1;
|
||||
for (g, &v) in row.iter().enumerate() {
|
||||
acc.genome_totals[g] += if presence {
|
||||
u32::from(v >= threshold)
|
||||
} else {
|
||||
v
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
emit_batch(&batch, &accs, idx.meta(), args.count_missing, &mut out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Output ────────────────────────────────────────────────────────────────────
|
||||
|
||||
fn emit_batch(
|
||||
batch: &QueryBatch,
|
||||
accs: &[SeqAcc],
|
||||
meta: &obikindex::meta::IndexMeta,
|
||||
count_missing: bool,
|
||||
out: &mut impl Write,
|
||||
) {
|
||||
for (seq_idx, (id, seq)) in batch.ids.iter().zip(batch.seqs.iter()).enumerate() {
|
||||
let acc = &accs[seq_idx];
|
||||
let mut ann = serde_json::Map::new();
|
||||
|
||||
ann.insert("kmer_count".into(), acc.kmer_count.into());
|
||||
if count_missing {
|
||||
ann.insert("kmer_missing".into(), acc.kmer_missing.into());
|
||||
}
|
||||
let mut match_map = serde_json::Map::new();
|
||||
for (g, label) in meta.genomes.iter().enumerate() {
|
||||
match_map.insert(label.clone(), acc.genome_totals[g].into());
|
||||
}
|
||||
ann.insert("kmer_strict_matches".into(), match_map.into());
|
||||
|
||||
let ann_str = serde_json::to_string(&ann).unwrap_or_else(|_| "{}".to_string());
|
||||
|
||||
// OBITools4 FASTA format: >id {"key":value,...}
|
||||
let _ = writeln!(out, ">{id} {ann_str}");
|
||||
let _ = out.write_all(seq);
|
||||
let _ = out.write_all(b"\n");
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,8 @@ enum Commands {
|
||||
Merge(cmd::merge::MergeArgs),
|
||||
/// Filter and compact an existing index into a new single-layer index
|
||||
Rebuild(cmd::rebuild::RebuildArgs),
|
||||
/// Query an index with sequences and annotate matches
|
||||
Query(cmd::query::QueryArgs),
|
||||
/// Dump all indexed kmers as CSV (kmer + per-genome counts or presence)
|
||||
Dump(cmd::dump::DumpArgs),
|
||||
/// Compute pairwise distance matrix between genomes; optionally build NJ/UPGMA trees
|
||||
@@ -54,6 +56,7 @@ fn main() {
|
||||
Commands::Merge(args) => cmd::merge::run(args),
|
||||
Commands::Dump(args) => cmd::dump::run(args),
|
||||
Commands::Rebuild(args) => cmd::rebuild::run(args),
|
||||
Commands::Query(args) => cmd::query::run(args),
|
||||
Commands::Distance(args) => cmd::distance::run(args),
|
||||
Commands::Unitig(args) => cmd::unitig::run(args),
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ mod index_layer;
|
||||
mod kmer_sort;
|
||||
mod merge_layer;
|
||||
mod partition;
|
||||
mod query_layer;
|
||||
mod rebuild_layer;
|
||||
|
||||
pub use filter::KmerFilter;
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
use std::path::Path;
|
||||
|
||||
use obicompactvec::{PersistentBitMatrix, PersistentCompactIntMatrix};
|
||||
use obikseq::{CanonicalKmer, RoutableSuperKmer};
|
||||
use obiskio::{SKError, SKResult};
|
||||
use obilayeredmap::{MphfLayer, OLMError};
|
||||
use obilayeredmap::meta::PartitionMeta;
|
||||
|
||||
use crate::partition::KmerPartition;
|
||||
|
||||
const INDEX_SUBDIR: &str = "index";
|
||||
|
||||
fn olm_to_sk(e: OLMError) -> SKError {
|
||||
match e {
|
||||
OLMError::Io(io_err) => SKError::Io(io_err),
|
||||
other => SKError::InvalidData { context: "query", detail: other.to_string() },
|
||||
}
|
||||
}
|
||||
|
||||
// ── per-layer query handle ────────────────────────────────────────────────────
|
||||
|
||||
enum QueryLayer {
|
||||
/// Layer<()> — MPHF-only, no data matrix; all indexed kmers map to 1 per genome.
|
||||
SetOnly(MphfLayer),
|
||||
Presence(MphfLayer, PersistentBitMatrix),
|
||||
Count(MphfLayer, PersistentCompactIntMatrix),
|
||||
}
|
||||
|
||||
impl QueryLayer {
|
||||
fn open(layer_dir: &Path, with_counts: bool) -> SKResult<Self> {
|
||||
let presence_dir = layer_dir.join("presence");
|
||||
let counts_dir = layer_dir.join("counts");
|
||||
|
||||
if with_counts && counts_dir.exists() {
|
||||
let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?;
|
||||
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?;
|
||||
Ok(QueryLayer::Count(mphf, mat))
|
||||
} else if presence_dir.exists() {
|
||||
let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?;
|
||||
let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?;
|
||||
Ok(QueryLayer::Presence(mphf, mat))
|
||||
} else if counts_dir.exists() {
|
||||
// presence query on a count index — return counts as-is
|
||||
let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?;
|
||||
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?;
|
||||
Ok(QueryLayer::Count(mphf, mat))
|
||||
} else {
|
||||
let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?;
|
||||
Ok(QueryLayer::SetOnly(mphf))
|
||||
}
|
||||
}
|
||||
|
||||
/// Return `Some(per-genome row)` if `kmer` is indexed in this layer, else `None`.
|
||||
fn find(&self, kmer: CanonicalKmer, n_genomes: usize) -> Option<Box<[u32]>> {
|
||||
match self {
|
||||
QueryLayer::SetOnly(mphf) => {
|
||||
mphf.find(kmer)
|
||||
.map(|_| vec![1u32; n_genomes].into_boxed_slice())
|
||||
}
|
||||
QueryLayer::Presence(mphf, mat) => {
|
||||
mphf.find(kmer)
|
||||
.map(|slot| mat.row(slot).iter().map(|&b| b as u32).collect())
|
||||
}
|
||||
QueryLayer::Count(mphf, mat) => {
|
||||
mphf.find(kmer).map(|slot| mat.row(slot))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── KmerPartition::query_partition ───────────────────────────────────────────
|
||||
|
||||
impl KmerPartition {
|
||||
/// Query a single partition for a slice of (already-routed) super-kmers.
|
||||
///
|
||||
/// Returns one entry per input super-kmer; each entry is a `Vec` with one
|
||||
/// `Option<Box<[u32]>>` per k-mer inside that super-kmer:
|
||||
/// - `None` — k-mer absent from the index
|
||||
/// - `Some(row)` — per-genome count (count index) or 0/1 (presence index)
|
||||
///
|
||||
/// All `superkmers` must belong to this partition (same minimizer bucket).
|
||||
pub fn query_partition(
|
||||
&self,
|
||||
part_idx: usize,
|
||||
superkmers: &[&RoutableSuperKmer],
|
||||
k: usize,
|
||||
n_genomes: usize,
|
||||
with_counts: bool,
|
||||
) -> SKResult<Vec<Vec<Option<Box<[u32]>>>>> {
|
||||
if superkmers.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let index_dir = self.part_dir(part_idx).join(INDEX_SUBDIR);
|
||||
|
||||
if !index_dir.exists() {
|
||||
return Ok(superkmers
|
||||
.iter()
|
||||
.map(|rsk| vec![None; rsk.seql() - k + 1])
|
||||
.collect());
|
||||
}
|
||||
|
||||
let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?;
|
||||
let layers: Vec<QueryLayer> = (0..meta.n_layers)
|
||||
.map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts))
|
||||
.collect::<SKResult<_>>()?;
|
||||
|
||||
Ok(superkmers
|
||||
.iter()
|
||||
.map(|rsk| {
|
||||
rsk.superkmer()
|
||||
.iter_canonical_kmers()
|
||||
.map(|kmer| {
|
||||
layers.iter().find_map(|layer| layer.find(kmer, n_genomes))
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
@@ -71,6 +71,20 @@ impl RoutableSuperKmer {
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for RoutableSuperKmer {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.superkmer == other.superkmer
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for RoutableSuperKmer {}
|
||||
|
||||
impl std::hash::Hash for RoutableSuperKmer {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
self.superkmer.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl Sequence for RoutableSuperKmer {
|
||||
type Canonical = RoutableSuperKmer;
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ mod mimetype;
|
||||
pub mod normalize;
|
||||
mod path_iterator;
|
||||
pub mod peakreader;
|
||||
pub mod record;
|
||||
pub mod xopen;
|
||||
|
||||
pub use chunk::{SeqChunkIter, fasta_chunks, fastq_chunks,
|
||||
|
||||
@@ -0,0 +1,222 @@
|
||||
//! Per-sequence record parser for FASTA and FASTQ chunks.
|
||||
//!
|
||||
//! Same automaton structure as `normalize.rs` — only the actions differ:
|
||||
//! instead of writing into a single flat rope, we accumulate per-sequence
|
||||
//! data (id, raw ASCII, normalised ACGT\x00 rope).
|
||||
|
||||
use obikrope::{ForwardCursor, Rope, RopeCursor};
|
||||
|
||||
/// One sequence record extracted from a FASTA or FASTQ chunk.
|
||||
pub struct SeqRecord {
|
||||
/// Sequence identifier (everything before the first space in the header).
|
||||
pub id: String,
|
||||
/// Raw sequence bytes, newlines stripped, non-ACGT characters preserved.
|
||||
/// Reproduced verbatim in query output.
|
||||
pub sequence: Vec<u8>,
|
||||
/// Per-sequence normalised rope: uppercase ACGT segments of length ≥ k
|
||||
/// separated by `\x00`. Ready for `SuperKmerIter`.
|
||||
pub normalized: Rope,
|
||||
}
|
||||
|
||||
/// Parse all records from a FASTA or FASTQ chunk rope.
|
||||
/// Returns an empty vec if the rope carries no recognised mime type.
|
||||
pub fn parse_chunk(rope: &Rope, k: usize) -> Vec<SeqRecord> {
|
||||
let cursor = rope.fw_cursor();
|
||||
match rope.mime_type() {
|
||||
Some("text/fasta") => parse_fasta(cursor, k),
|
||||
Some("text/fastq") => parse_fastq(cursor, k),
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
|
||||
// ── Shared state accumulated while scanning one sequence ──────────────────────
|
||||
|
||||
struct RecordBuilder {
|
||||
id: String,
|
||||
sequence: Vec<u8>, // raw ASCII, no newlines
|
||||
norm: Vec<u8>, // ACGT\x00 segments being built
|
||||
seg_start: usize, // index in norm where current segment started
|
||||
k: usize,
|
||||
}
|
||||
|
||||
impl RecordBuilder {
|
||||
fn new(k: usize) -> Self {
|
||||
Self { id: String::new(), sequence: Vec::new(), norm: Vec::new(), seg_start: 0, k }
|
||||
}
|
||||
|
||||
fn reset(&mut self, id: String) {
|
||||
self.id = id;
|
||||
self.sequence.clear();
|
||||
self.norm.clear();
|
||||
self.seg_start = 0;
|
||||
}
|
||||
|
||||
/// Push one accepted ACGT byte.
|
||||
fn push_acgt(&mut self, b: u8) {
|
||||
self.sequence.push(b);
|
||||
self.norm.push(b);
|
||||
}
|
||||
|
||||
/// Push one non-ACGT byte to the raw sequence only (not to the norm buffer).
|
||||
fn push_raw(&mut self, b: u8) {
|
||||
self.sequence.push(b);
|
||||
}
|
||||
|
||||
/// Close the current ACGT segment (same logic as `end_segment` in normalize.rs).
|
||||
fn end_segment(&mut self) {
|
||||
if self.norm.len() - self.seg_start >= self.k {
|
||||
self.norm.push(0x00);
|
||||
self.seg_start = self.norm.len();
|
||||
} else {
|
||||
self.norm.truncate(self.seg_start);
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume into a SeqRecord. Closes any open segment first.
|
||||
fn finish(mut self) -> Option<SeqRecord> {
|
||||
self.end_segment();
|
||||
if self.id.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let mut rope = Rope::new(None);
|
||||
if !self.norm.is_empty() {
|
||||
rope.push(self.norm);
|
||||
}
|
||||
Some(SeqRecord { id: self.id, sequence: self.sequence, normalized: rope })
|
||||
}
|
||||
}
|
||||
|
||||
// ── FASTA automaton ───────────────────────────────────────────────────────────
|
||||
|
||||
fn parse_fasta(cursor: ForwardCursor<'_>, k: usize) -> Vec<SeqRecord> {
|
||||
let mut records: Vec<SeqRecord> = Vec::new();
|
||||
let mut builder = RecordBuilder::new(k);
|
||||
|
||||
// skip up to (and including) the first '>'
|
||||
loop {
|
||||
match cursor.read_next().ok() {
|
||||
None => return records,
|
||||
Some(b'>') => break,
|
||||
Some(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
// read first id — read_id already consumes the full header line
|
||||
builder.id = read_id(&cursor);
|
||||
|
||||
loop {
|
||||
match cursor.read_next().ok() {
|
||||
None => {
|
||||
// EOF — close final segment and emit
|
||||
if let Some(rec) = builder.finish() {
|
||||
records.push(rec);
|
||||
}
|
||||
return records;
|
||||
}
|
||||
Some(b'\n') | Some(b'\r') => {
|
||||
// peek: next non-empty char determines if new record starts
|
||||
match cursor.read_ahead(1).ok() {
|
||||
Some(b'>') => {
|
||||
// end of current record
|
||||
builder.end_segment();
|
||||
if let Some(rec) = builder.finish() {
|
||||
records.push(rec);
|
||||
}
|
||||
cursor.read_next().ok(); // consume '>'
|
||||
let id = read_id(&cursor); // already consumes header line
|
||||
builder = RecordBuilder::new(k);
|
||||
builder.reset(id);
|
||||
}
|
||||
None => {
|
||||
builder.end_segment();
|
||||
if let Some(rec) = builder.finish() {
|
||||
records.push(rec);
|
||||
}
|
||||
return records;
|
||||
}
|
||||
Some(_) => {} // continuation line — do nothing
|
||||
}
|
||||
}
|
||||
Some(b) => {
|
||||
let upper = b & !0x20u8;
|
||||
if matches!(upper, b'A' | b'C' | b'G' | b'T') {
|
||||
builder.push_acgt(upper);
|
||||
} else {
|
||||
builder.push_raw(b);
|
||||
builder.end_segment();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── FASTQ automaton ───────────────────────────────────────────────────────────
|
||||
|
||||
fn parse_fastq(cursor: ForwardCursor<'_>, k: usize) -> Vec<SeqRecord> {
|
||||
let mut records: Vec<SeqRecord> = Vec::new();
|
||||
|
||||
loop {
|
||||
// find '@'
|
||||
loop {
|
||||
match cursor.read_next().ok() {
|
||||
None => return records,
|
||||
Some(b'@') => break,
|
||||
Some(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
let mut builder = RecordBuilder::new(k);
|
||||
builder.id = read_id(&cursor); // already consumes the full header line
|
||||
|
||||
// sequence line — stop at newline, non-ACGT breaks segment
|
||||
loop {
|
||||
match cursor.read_next().ok() {
|
||||
None | Some(b'\n') | Some(b'\r') => {
|
||||
builder.end_segment();
|
||||
break;
|
||||
}
|
||||
Some(b) => {
|
||||
let upper = b & !0x20u8;
|
||||
if matches!(upper, b'A' | b'C' | b'G' | b'T') {
|
||||
builder.push_acgt(upper);
|
||||
} else {
|
||||
builder.push_raw(b);
|
||||
builder.end_segment();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
skip_line(&cursor); // '+' line
|
||||
skip_line(&cursor); // quality line
|
||||
|
||||
if let Some(rec) = builder.finish() {
|
||||
records.push(rec);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
fn read_id(cursor: &ForwardCursor<'_>) -> String {
|
||||
let mut id = Vec::new();
|
||||
loop {
|
||||
match cursor.read_next().ok() {
|
||||
None | Some(b'\n') | Some(b'\r') => break,
|
||||
Some(b' ') | Some(b'\t') => {
|
||||
skip_line(cursor);
|
||||
break;
|
||||
}
|
||||
Some(b) => id.push(b),
|
||||
}
|
||||
}
|
||||
String::from_utf8_lossy(&id).into_owned()
|
||||
}
|
||||
|
||||
fn skip_line(cursor: &ForwardCursor<'_>) {
|
||||
while let Some(c) = cursor.read_next().ok() {
|
||||
if c == b'\n' {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user