Push vqmnuyrkpxot #9
@@ -8,6 +8,9 @@ Ne modifier aucun fichier à moins d'une demande explicite de modification. En p
|
|||||||
**Règle absolue : ne jamais substituer une dépendance ou une bibliothèque sans validation explicite.**
|
**Règle absolue : ne jamais substituer une dépendance ou une bibliothèque sans validation explicite.**
|
||||||
Si une dépendance demandée pose problème (erreur de compilation, bug, API manquante), exposer le problème et proposer des alternatives — ne jamais switcher silencieusement vers une autre bibliothèque. Le choix des dépendances est une décision d'architecture qui appartient au développeur.
|
Si une dépendance demandée pose problème (erreur de compilation, bug, API manquante), exposer le problème et proposer des alternatives — ne jamais switcher silencieusement vers une autre bibliothèque. Le choix des dépendances est une décision d'architecture qui appartient au développeur.
|
||||||
|
|
||||||
|
**Règle absolue : le code existant est une hypothèse, pas une vérité.**
|
||||||
|
Quand une nouvelle construction (type, itérateur, abstraction) rend du code historique injustifié, le signaler immédiatement et proposer de le supprimer — ne pas conserver les deux en parallèle par inertie. Le développeur demande explicitement de remettre en cause le code base : ne pas attendre qu'il insiste.
|
||||||
|
|
||||||
Tu maintiens en **anglais**, dense et sans remplissage, les documents suivants :
|
Tu maintiens en **anglais**, dense et sans remplissage, les documents suivants :
|
||||||
- `docmd/index.md` — document de discussion de base, enrichi progressivement au fil de nos échanges ; il reflète l'état courant de la réflexion sur le projet
|
- `docmd/index.md` — document de discussion de base, enrichi progressivement au fil de nos échanges ; il reflète l'état courant de la réflexion sur le projet
|
||||||
- les autres fichiers Markdown dans `docmd/` selon leur thème respectif
|
- les autres fichiers Markdown dans `docmd/` selon leur thème respectif
|
||||||
|
|||||||
+81
-48
@@ -19,34 +19,80 @@ Given a set of query sequences, determine for each sequence how many of its k-me
|
|||||||
The query follows the same superkmer-based partitioning strategy used at indexing time.
|
The query follows the same superkmer-based partitioning strategy used at indexing time.
|
||||||
|
|
||||||
```
|
```
|
||||||
for each query sequence:
|
for each batch of sequences:
|
||||||
decompose into superkmers (non-ACGT breaks, same minimiser scheme as indexing)
|
build QueryBatch: decompose all sequences into superkmers, deduplicate
|
||||||
for each superkmer:
|
split superkmers by partition via minimiser hash
|
||||||
route to partition p via minimiser hash
|
for each partition p:
|
||||||
for each kmer in the superkmer:
|
query_partition(p, superkmers_routed_to_p)
|
||||||
lookup kmer in partition p (MPHF → evidence check → matrix row)
|
→ load QueryLayer(s) for p
|
||||||
accumulate result into per-sequence accumulators
|
→ for each kmer in each superkmer: MphfLayer::find(kmer)
|
||||||
emit annotated sequence
|
broadcast results back to each (seq_idx, kmer_offset) that referenced the superkmer
|
||||||
|
emit annotated sequences
|
||||||
```
|
```
|
||||||
|
|
||||||
Parallelism is **per sequence**: each worker thread handles all partitions of one sequence independently. This avoids cross-thread coordination when merging partial results and keeps memory usage proportional to the number of concurrent sequences rather than to the number of partitions.
|
Superkmers that appear more than once in the batch (same sequence or across sequences) are deduplicated: each unique `RoutableSuperKmer` is queried once per partition, and the result is broadcast to every `SKDesc` entry that references it.
|
||||||
|
|
||||||
|
Parallelism is **not yet active** in the current implementation: batches are processed sequentially on a single thread despite the `--threads` flag being parsed. The `QueryBatch` / `split_by_partition` design is structured to support per-partition parallelism in a future iteration.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Exact vs. approximate matching
|
## Layer lookup: `MphfLayer::find`
|
||||||
|
|
||||||
### Exact (default)
|
`MphfLayer::open` reads `layer_meta.json` and loads either exact or approximate evidence. The caller (`QueryLayer::find`) never chooses the dispatch path — it is fixed at open time by `LayerEvidence`:
|
||||||
|
|
||||||
Standard MPHF lookup followed by evidence check. O(1) per k-mer.
|
```rust
|
||||||
|
pub fn find(&self, kmer: CanonicalKmer) -> Option<usize> {
|
||||||
|
match &self.ev {
|
||||||
|
LayerEvidence::Exact { .. } => self.find_exact(kmer),
|
||||||
|
LayerEvidence::Approx { .. } => self.find_approx(kmer),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### 1-mismatch (`--mismatch` flag)
|
### Exact layers
|
||||||
|
|
||||||
For each k-mer of the query, generate all `3·k` single-substitution variants. Each variant is canonicalised and looked up independently in the index. If one or more variants are found, their per-genome rows are **summed** into the result for that k-mer position.
|
`find_exact` maps the k-mer through the MPHF to a slot, then calls `UnitigFileReader::verify_canonical_kmer(chunk_id, rank, kmer)` to confirm the stored k-mer matches. Zero false positives. Requires `UnitigFileReader::open()` (random-access via `.idx`); `open_sequential()` cannot serve random-access verification.
|
||||||
|
|
||||||
- If a k-mer matches exactly AND one of its variants also matches (distinct k-mers in the index), both contributions are accumulated.
|
### Approximate layers
|
||||||
- Exact and approximate matches are tracked separately in the output (see annotation schema below).
|
|
||||||
- The superkmer routing optimisation is **not** used in 1-mismatch mode: each variant is looked up directly via its own minimiser.
|
`find_approx` maps the k-mer through the MPHF, then checks a stored `b`-bit fingerprint of the canonical hash. False-positive rate: **1/2^b per k-mer query**. No `.idx` file is needed; the layer carries only `fingerprint.bin`.
|
||||||
- Cost: up to `3·k` MPHF probes per k-mer position vs. 1 in exact mode.
|
|
||||||
|
For a query window of z consecutive k-mers (Findere scheme), the false-positive rate per window is **1/2^(b·z)**. The `z` parameter is recorded in `layer_meta.json` at build time but is not enforced during querying — the caller is responsible for interpreting window-level results accordingly.
|
||||||
|
|
||||||
|
### `QueryLayer` variant selection
|
||||||
|
|
||||||
|
`QueryLayer::open` in `query_layer.rs` selects the data matrix to pair with `MphfLayer`:
|
||||||
|
|
||||||
|
| Condition | Variant | Data returned per k-mer |
|
||||||
|
|---|---|---|
|
||||||
|
| `with_counts=true` and `counts/` exists | `Count` | raw count per genome |
|
||||||
|
| `presence/` exists | `Presence` | 0/1 per genome (bit matrix) |
|
||||||
|
| only `counts/` exists | `Count` | counts used as-is |
|
||||||
|
| neither exists | `SetOnly` | 1 for every genome |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## `open()` vs `open_sequential()`
|
||||||
|
|
||||||
|
`UnitigFileReader::open()` loads the `.idx` block-offset table, enabling random access to individual unitig chunks. It is required whenever `verify_canonical_kmer` is called (exact layers at query time).
|
||||||
|
|
||||||
|
`UnitigFileReader::open_sequential()` skips the `.idx` and supports only forward iteration. It is sufficient for:
|
||||||
|
- build passes that scan all unitigs sequentially (`build_exact_evidence`, `build_approx_evidence`);
|
||||||
|
- the `unitig` subcommand, which iterates and prints unitigs without random access.
|
||||||
|
|
||||||
|
`KmerIndex::open()` (called by `query::run`) triggers `MphfLayer::open` for each layer, which calls `UnitigFileReader::open()` for exact layers. Approximate layers do not open a unitig reader at all.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Presence / count mode at query time
|
||||||
|
|
||||||
|
The `--force-presence` flag and `--presence-threshold` control how per-genome values are accumulated, independently of what the index stores:
|
||||||
|
|
||||||
|
```
|
||||||
|
genome_totals[g] += if presence { u32::from(v >= threshold) } else { v }
|
||||||
|
```
|
||||||
|
|
||||||
|
`presence` is true when `--force-presence` is set or when the index has no counts (`!with_counts`). The default `presence_threshold` is 1, so any nonzero count counts as a match.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -55,57 +101,44 @@ For each k-mer of the query, generate all `3·k` single-substitution variants. E
|
|||||||
Output sequences are written in **OBITools4 format**: the original sequence with a JSON annotation map in the title line.
|
Output sequences are written in **OBITools4 format**: the original sequence with a JSON annotation map in the title line.
|
||||||
|
|
||||||
```
|
```
|
||||||
>read_id {"kmer_total":150,"kmer_found":59,...}
|
>read_id {"kmer_count":59,"kmer_strict_matches":{"genome_a":42,"genome_b":7,...}}
|
||||||
ATCGATCG...
|
ATCGATCG...
|
||||||
```
|
```
|
||||||
|
|
||||||
Genome order in all list-valued annotations follows the genome order recorded in `index.meta`.
|
Genome keys in `kmer_strict_matches` are genome labels from `index.meta`. Key order follows iteration order of `meta.genomes`.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Annotation schema
|
## Annotation schema (current implementation)
|
||||||
|
|
||||||
### Summary mode (default)
|
|
||||||
|
|
||||||
| Key | Type | Condition | Semantics |
|
| Key | Type | Condition | Semantics |
|
||||||
|---|---|---|---|
|
|---|---|---|---|
|
||||||
| `kmer_total` | int | always | total k-mers in the (masked) sequence |
|
| `kmer_count` | int | always | k-mers with at least one match |
|
||||||
| `kmer_found` | int | always | k-mers with at least one match (exact or approx) |
|
| `kmer_missing` | int | `--count-missing` | k-mers absent from every layer |
|
||||||
| `kmer_missing` | int | `--count-missing` | k-mers absent from the index |
|
| `kmer_strict_matches` | object | always | per-genome accumulated value (label → count or 0/1) |
|
||||||
| `kmer_match` | list[int] | always | per-genome matched k-mer count (exact + approx) |
|
|
||||||
| `kmer_match_exact` | list[int] | `--mismatch` | per-genome exact match count |
|
|
||||||
| `kmer_match_approx` | list[int] | `--mismatch` | per-genome approx match count |
|
|
||||||
| `count_match` | list[int] | count index | per-genome sum of index counts for matched k-mers |
|
|
||||||
|
|
||||||
`kmer_match[i]` is the number of k-mer positions in the query that contribute at least one match to genome i. In 1-mismatch mode, a single k-mer position can contribute to multiple genomes if several of its variants are present in the index.
|
`kmer_count` counts matched k-mer positions (incremented once per `Some(row)` hit regardless of how many genomes are covered). `kmer_missing` counts `None` hits.
|
||||||
|
|
||||||
`count_match[i]` sums raw index counts across all matched k-mer positions for genome i. Only meaningful for count indexes.
|
**Note on doc/impl divergence:** the doc previously used keys `kmer_total`, `kmer_found`, and `kmer_match` (list). The implementation uses `kmer_count` (int, matched only) and `kmer_strict_matches` (object keyed by genome label). `--mismatch` and `--detail` are parsed but not yet implemented and emit a warning.
|
||||||
|
|
||||||
### Detail mode (`--detail`)
|
|
||||||
|
|
||||||
All summary keys, plus per-position coverage vectors — one list per genome, length `len(sequence) − k + 1`:
|
|
||||||
|
|
||||||
| Key | Type | Condition | Semantics |
|
|
||||||
|---|---|---|---|
|
|
||||||
| `cov_<i>` | list[int] | `--detail` | coverage at each k-mer position for genome i; raw count (count index) or 0/1 (presence index); 0 if absent |
|
|
||||||
| `cov_exact_<i>` | list[int] | `--detail` + `--mismatch` | exact-match contribution per position |
|
|
||||||
| `cov_approx_<i>` | list[int] | `--detail` + `--mismatch` | approx-match contribution per position |
|
|
||||||
|
|
||||||
Genome indices in key names are 0-based integers matching the `index.meta` genome order. Genome labels are not used as key names to avoid issues with special characters in long or complex genome identifiers.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## CLI
|
## CLI
|
||||||
|
|
||||||
```
|
```
|
||||||
obikmer query -i <index> [--summary | --detail] [--mismatch] [--count-missing] <query.fa>
|
obikmer query -i <index> [--detail] [--mismatch] [--count-missing]
|
||||||
|
[--force-presence] [--presence-threshold <n>]
|
||||||
|
[-T <threads>] <query.fa> [<query2.fa> ...]
|
||||||
```
|
```
|
||||||
|
|
||||||
`--summary` is the default; `--detail` implies `--summary` (all summary keys are always present).
|
`--mismatch` and `--detail` are accepted but currently ignored with a warning on stderr.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Future work
|
## Future work
|
||||||
|
|
||||||
- **Read classification** (`--classify`): assign each read to the genome with the highest `kmer_match` score; emit as a single annotation key.
|
- **`--mismatch`**: 1-mismatch approximate matching — generate `3·k` single-substitution variants per k-mer, look each up independently.
|
||||||
- **Whitelist / blacklist filtering**: accept or reject sequences based on whether their k-mer match score for a designated set of genomes exceeds a threshold.
|
- **`--detail`**: per-position coverage vectors (`cov_<i>`) per genome.
|
||||||
|
- **Read classification** (`--classify`): assign each read to the genome with the highest match score.
|
||||||
|
- **Parallelism**: activate per-partition or per-sequence worker threads using the already-parsed `--threads` value.
|
||||||
|
- **Whitelist / blacklist filtering**: threshold-based accept/reject on per-genome match scores.
|
||||||
|
|||||||
@@ -2,19 +2,11 @@
|
|||||||
|
|
||||||
The `obiread` crate provides a streaming iterator that reads FASTA or FASTQ files in fixed-size blocks and yields self-contained chunks, each ending on a complete sequence record boundary. Chunks are consumed in parallel by downstream workers.
|
The `obiread` crate provides a streaming iterator that reads FASTA or FASTQ files in fixed-size blocks and yields self-contained chunks, each ending on a complete sequence record boundary. Chunks are consumed in parallel by downstream workers.
|
||||||
|
|
||||||
## Output type: rope
|
## Output type: Rope
|
||||||
|
|
||||||
Each chunk is a `Vec<Bytes>` — a **rope**: a list of reference-counted byte slices that are not necessarily contiguous in memory. The consumer iterates over the slices in order.
|
Each chunk is a `Rope` — a segmented byte sequence: a `Vec` of blocks, where each block is a `Vec<Cell<u8>>`. The consumer iterates over the blocks via a forward or backward cursor.
|
||||||
|
|
||||||
Using `bytes::Bytes` means the split at the record boundary is O(1): `Bytes::split_to(n)` adjusts a reference counter, not memory. No `memcpy` in the common case.
|
`Rope::split_off(pos)` splits at an absolute byte offset in O(log n) (binary search over block-start index). If `pos` falls inside a block, that block is split in two via `Vec::split_off` — no `memcpy` in the common case.
|
||||||
|
|
||||||
## Allocation policy
|
|
||||||
|
|
||||||
| Case | Cost |
|
|
||||||
|------|------|
|
|
||||||
| Boundary found in the current block (common) | zero extra allocation — `split_to` only |
|
|
||||||
| Boundary straddles multiple blocks (sequence > block size, rare) | one allocation to pack the rope into a flat buffer |
|
|
||||||
| EOF flush | zero extra allocation |
|
|
||||||
|
|
||||||
## SeqChunkIter
|
## SeqChunkIter
|
||||||
|
|
||||||
@@ -22,7 +14,7 @@ Using `bytes::Bytes` means the split at the record boundary is O(1): `Bytes::spl
|
|||||||
pub struct SeqChunkIter<R: Read> { /* private */ }
|
pub struct SeqChunkIter<R: Read> { /* private */ }
|
||||||
|
|
||||||
impl<R: Read> Iterator for SeqChunkIter<R> {
|
impl<R: Read> Iterator for SeqChunkIter<R> {
|
||||||
type Item = io::Result<Vec<Bytes>>;
|
type Item = io::Result<Rope>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fasta_chunks<R: Read>(source: R) -> SeqChunkIter<R>
|
pub fn fasta_chunks<R: Read>(source: R) -> SeqChunkIter<R>
|
||||||
@@ -32,22 +24,21 @@ pub fn fastq_chunks<R: Read>(source: R) -> SeqChunkIter<R>
|
|||||||
`next()` loop:
|
`next()` loop:
|
||||||
|
|
||||||
```text
|
```text
|
||||||
1. read one block of block_size bytes → push onto rope
|
1. read one block of block_size bytes → push onto Rope
|
||||||
2. probe check: if the boundary marker ("\n>" or "\n@") is absent from the
|
2. call splitter(rope) → Option<abs_offset>
|
||||||
last block, skip the splitter (avoids a full backward scan for nothing)
|
if Some(pos):
|
||||||
3. call splitter on last block
|
tail = rope.split_off(pos) ← O(log n), may split one block
|
||||||
if found at offset n:
|
chunk = mem::replace(&mut rope, tail)
|
||||||
remainder = last_block.split_to(n) ← O(1), zero copy
|
return Some(Ok(chunk))
|
||||||
return std::mem::take(&mut self.rope) ← the chunk
|
3. if EOF and rope non-empty: return Some(Ok(rope)) as final chunk
|
||||||
4. if rope.len() > 1 (multi-block accumulation):
|
4. if EOF and rope empty: return None
|
||||||
pack rope into one flat buffer ← one alloc
|
|
||||||
retry splitter on flat buffer
|
|
||||||
5. if EOF: flush remaining rope as final chunk
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
The `Splitter` function signature is `fn(&Rope) -> Option<usize>`. It returns the absolute byte offset of the start of the last complete record, or `None` if no boundary was found in the accumulated rope (need more data).
|
||||||
|
|
||||||
## Boundary detection — FASTA
|
## Boundary detection — FASTA
|
||||||
|
|
||||||
Backward scan with a 2-state machine. Searches for `>` immediately preceded by `\n` or `\r`:
|
Backward scan with a 2-state machine. Searches (right to left) for `>` followed by `\n` or `\r` (i.e., a `>` that is preceded by a newline in forward order):
|
||||||
|
|
||||||
```mermaid
|
```mermaid
|
||||||
stateDiagram-v2
|
stateDiagram-v2
|
||||||
@@ -58,13 +49,13 @@ stateDiagram-v2
|
|||||||
FoundGt --> [*] : '\\n' / '\\r' ✓
|
FoundGt --> [*] : '\\n' / '\\r' ✓
|
||||||
```
|
```
|
||||||
|
|
||||||
Returns the byte offset of the `>` that starts the last complete record.
|
Returns the byte offset of the `>` that starts the last complete record. Returns `None` if only one `>` is found (cannot confirm there is a prior complete record).
|
||||||
|
|
||||||
## Boundary detection — FASTQ
|
## Boundary detection — FASTQ
|
||||||
|
|
||||||
FASTQ records have a rigid 4-line structure (`@header`, sequence, `+`, quality). The `@` character (ASCII 64, Phred score 31) can appear legitimately in quality lines, making any forward heuristic unreliable. The backward scanner verifies the full structural context before accepting a candidate `@`.
|
FASTQ records have a rigid 4-line structure (`@header`, sequence, `+`, quality). The `@` character (ASCII 64, Phred score 31) can appear legitimately in quality lines, making any forward heuristic unreliable. The backward scanner verifies the full structural context before accepting a candidate `@`.
|
||||||
|
|
||||||
7-state machine (port of Go's `EndOfLastFastqEntry`), scanning from **right to left**. Each time a `+` is found, its position is saved as `restart`; any state mismatch resets the scan to that position.
|
7-state machine (states 0–6), scanning from **right to left**. Each time a `+` is found, its position is saved as `restart`; any state mismatch resets the scan to that position.
|
||||||
|
|
||||||
```mermaid
|
```mermaid
|
||||||
stateDiagram-v2
|
stateDiagram-v2
|
||||||
|
|||||||
@@ -1,38 +1,57 @@
|
|||||||
# Kmer — implementation
|
# Kmer — implementation
|
||||||
|
|
||||||
## Memory layout
|
## Types and layout
|
||||||
|
|
||||||
`Kmer` is a `#[repr(transparent)]` newtype over `u64`:
|
`KmerOf<L>` is a `#[repr(transparent)]` newtype over `u64` parameterized by a `KmerLength` marker:
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
#[repr(transparent)]
|
#[repr(transparent)]
|
||||||
pub struct Kmer(u64);
|
pub struct KmerOf<L: KmerLength>(u64, PhantomData<L>);
|
||||||
```
|
```
|
||||||
|
|
||||||
Nucleotides are packed 2 bits each, **left-aligned**, MSB-first. Nucleotide 0 occupies bits 63–62; nucleotide i occupies bits 63−2i and 62−2i. The low 64−2k bits are always zero. k is **not stored** — it is a parameter of every operation that needs it, and will be owned by the future collection-level indexer.
|
Three marker types implement `KmerLength`:
|
||||||
|
|
||||||
|
| Marker | `len()` source | Used for |
|
||||||
|
|--------|---------------|---------|
|
||||||
|
| `KLen` | `params::k()` | k-mers |
|
||||||
|
| `MLen` | `params::m()` | minimizers |
|
||||||
|
| `ConstLen<N>` | const generic `N` | tests |
|
||||||
|
|
||||||
|
Public aliases:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
pub type Kmer = KmerOf<KLen>; // k-mer, global k
|
||||||
|
pub type Minimizer = CanonicalKmerOf<MLen>; // canonical m-mer, global m
|
||||||
|
```
|
||||||
|
|
||||||
|
Nucleotides are packed 2 bits each, **left-aligned**, MSB-first. Nucleotide 0 occupies bits 63–62; nucleotide i occupies bits 63−2i and 62−2i. The low 64−2·len bits are always zero. The length is **not stored** — every operation reads it from `L::len()`.
|
||||||
|
|
||||||
| 63–62 | 61–60 | … | 63−2(k−1)−1 to 63−2(k−1) | 63−2k down to 0 |
|
| 63–62 | 61–60 | … | 63−2(k−1)−1 to 63−2(k−1) | 63−2k down to 0 |
|
||||||
|-------|-------|---|--------------------------|-----------------|
|
|-------|-------|---|--------------------------|-----------------|
|
||||||
| nt 0 | nt 1 | … | nt k−1 | zero padding |
|
| nt 0 | nt 1 | … | nt k−1 | zero padding |
|
||||||
|
|
||||||
|
## Global parameters
|
||||||
|
|
||||||
|
`params::set_k(k)` / `params::k()` and `params::set_m(m)` / `params::m()` are backed by `OnceLock<usize>` in production (write-once, panic on conflict) and by `thread_local! { Cell<usize> }` in test builds (per-thread, freely writable). `params::init(k, m)` sets both in one call.
|
||||||
|
|
||||||
## Encoding
|
## Encoding
|
||||||
|
|
||||||
`Kmer::from_ascii(ascii, k)` encodes the first k bytes of an ASCII slice using the shared `ENC` table (see [SuperKmer — ASCII encoding](superkmer.md#ascii-encoding-and-decoding)):
|
`KmerOf::<L>::from_ascii(ascii)` encodes the first `L::len()` bytes using the shared `ENC` table (see [SuperKmer — ASCII encoding](superkmer.md#ascii-encoding-and-decoding)):
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
for i in 0..k {
|
for i in 0..k {
|
||||||
val = (val << 2) | encode_base(ascii[i]) as u64;
|
val = (val << 2) | encode_base(ascii[i]) as u64;
|
||||||
}
|
}
|
||||||
Kmer(val << (64 - 2 * k))
|
KmerOf(val << (64 - 2 * k), PhantomData)
|
||||||
```
|
```
|
||||||
|
|
||||||
Zero allocation — result lives on the stack.
|
Zero allocation — result lives on the stack.
|
||||||
|
|
||||||
## Decoding
|
## Decoding
|
||||||
|
|
||||||
`write_ascii(k, buf)` appends k ASCII characters to a caller-supplied `Vec<u8>` using the shared `DEC4` table: one lookup per 4 nucleotides, two partial-byte lookups for the remainder. No allocation in the hot path.
|
`write_ascii(writer)` writes k ASCII characters to any `W: Write` using the shared `DEC4` table: one lookup per 4 nucleotides, one partial lookup for the remainder. No allocation in the hot path.
|
||||||
|
|
||||||
`to_ascii(k)` is a convenience wrapper that allocates and returns a `Vec<u8>`; intended for tests and display only.
|
`to_ascii()` is a convenience wrapper that allocates and returns a `Vec<u8>`; intended for tests and display only.
|
||||||
|
|
||||||
## Reverse complement
|
## Reverse complement
|
||||||
|
|
||||||
@@ -43,18 +62,30 @@ let x = !self.0; /
|
|||||||
let x = x.swap_bytes(); // reverse bytes
|
let x = x.swap_bytes(); // reverse bytes
|
||||||
let x = ((x >> 4) & 0x0F0F0F0F0F0F0F0F) | ((x & 0x0F0F0F0F0F0F0F0F) << 4); // swap nibbles
|
let x = ((x >> 4) & 0x0F0F0F0F0F0F0F0F) | ((x & 0x0F0F0F0F0F0F0F0F) << 4); // swap nibbles
|
||||||
let x = ((x >> 2) & 0x3333333333333333) | ((x & 0x3333333333333333) << 2); // swap 2-bit groups
|
let x = ((x >> 2) & 0x3333333333333333) | ((x & 0x3333333333333333) << 2); // swap 2-bit groups
|
||||||
Kmer(x << (64 - 2 * k))
|
KmerOf(x << (64 - 2 * k), PhantomData)
|
||||||
```
|
```
|
||||||
|
|
||||||
After complementing, bytes are reversed (`swap_bytes`), then nibbles, then 2-bit groups — restoring 2-bit nucleotides to their correct positions in reverse order. A final left-shift realigns to MSB. Zero allocation — result lives on the stack.
|
After complementing, bytes are reversed (`swap_bytes`), then nibbles, then 2-bit groups — restoring 2-bit nucleotides to their correct positions in reverse order. A final left-shift realigns to MSB. Zero allocation — result lives on the stack.
|
||||||
|
|
||||||
## Canonical form
|
## Canonical form and `CanonicalKmerOf`
|
||||||
|
|
||||||
|
`canonical()` returns a `CanonicalKmerOf<L>` — a distinct newtype that carries the same `u64` layout but enforces the invariant that the stored value equals `min(kmer, revcomp)`:
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
pub fn canonical(&self, k: usize) -> Self {
|
pub fn canonical(&self) -> CanonicalKmerOf<L> {
|
||||||
let rc = self.revcomp(k);
|
let rc = self.revcomp();
|
||||||
if self.0 <= rc.0 { *self } else { rc }
|
CanonicalKmerOf(if self.0 <= rc.0 { self.0 } else { rc.0 }, PhantomData)
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Lexicographic minimum of forward and reverse-complement, comparing the raw `u64` values directly (left-aligned encoding makes this equivalent to nucleotide-wise comparison). Zero allocation — result lives on the stack.
|
Lexicographic minimum of forward and reverse-complement, comparing the raw `u64` values directly (left-aligned encoding makes this equivalent to nucleotide-wise comparison). Zero allocation — result lives on the stack.
|
||||||
|
|
||||||
|
`CanonicalKmerOf::from_raw_unchecked(raw)` is the only other public constructor, for trusted paths such as deserialisation.
|
||||||
|
|
||||||
|
## Sliding window helpers
|
||||||
|
|
||||||
|
`push_right(nuc)` / `push_left(nuc)` shift the window by one base in O(1). `is_overlapping(other)` checks whether the last k−1 nucleotides of `self` equal the first k−1 of `other`.
|
||||||
|
|
||||||
|
## Hashing
|
||||||
|
|
||||||
|
`hash_kmer(raw: u64) -> u64` computes `mix64(raw ^ 0x9e3779b97f4a7c15)`, the seeded splitmix64 finalizer. `CanonicalKmerOf::seq_hash()` delegates to `hash_kmer`.
|
||||||
|
|||||||
@@ -26,13 +26,36 @@ Default mode is `Presence`. `Count` mode requires **all** source indexes to have
|
|||||||
All source indexes must satisfy:
|
All source indexes must satisfy:
|
||||||
|
|
||||||
- `IndexState::Indexed` (fully built — `index.done` sentinel present)
|
- `IndexState::Indexed` (fully built — `index.done` sentinel present)
|
||||||
- Same `kmer_size`, `minimizer_size`, `n_bits`
|
- Same `kmer_size`, `minimizer_size`, `n_partitions`
|
||||||
|
- Same evidence kind: all `Exact`, or all `Approx` with identical `(b, z)` parameters
|
||||||
- If `Count` mode: all sources must have `with_counts=true`
|
- If `Count` mode: all sources must have `with_counts=true`
|
||||||
|
|
||||||
`--force`: if the output directory already exists, it is deleted before the merge begins.
|
`--force`: if the output directory already exists, it is deleted before the merge begins.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## Evidence compatibility
|
||||||
|
|
||||||
|
`validate_evidence_compat(sources)` is called before any I/O. It compares each source's `EvidenceKind` against `sources[0]`:
|
||||||
|
|
||||||
|
- All `Exact` → accepted, output uses `Exact`
|
||||||
|
- All `Approx { b, z }` with same `(b, z)` → accepted, output uses those parameters
|
||||||
|
- Any other combination → `OKIError::IncompatibleEvidence`, with a message directing the user to run `reindex` first
|
||||||
|
|
||||||
|
Mixed exact/approx is a hard error, not a silent conversion.
|
||||||
|
|
||||||
|
```rust
|
||||||
|
fn validate_evidence_compat(sources: &[&KmerIndex]) -> OKIResult<EvidenceKind>
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Genome label deduplication
|
||||||
|
|
||||||
|
`compute_labels(sources, rename_duplicates)` assigns final genome labels across all sources before any file is written. The first occurrence of a label keeps the original name. Subsequent occurrences receive `.1`, `.2`, … suffixes when `rename_duplicates` is true, or trigger `OKIError::DuplicateGenomeLabel` otherwise.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## Algorithm
|
## Algorithm
|
||||||
|
|
||||||
### 1. Validation
|
### 1. Validation
|
||||||
@@ -41,36 +64,72 @@ Check all sources against the constraints above. Abort on any mismatch.
|
|||||||
|
|
||||||
### 2. Bootstrap output from first source
|
### 2. Bootstrap output from first source
|
||||||
|
|
||||||
Recursive file copy of `sources[0]` → `output`. This establishes the partition layout, all existing MPHFs, unitigs, and evidence files. The first source's genome is genome 0 in the output.
|
Recursive file copy of `sources[0]` → `output`. Immediately after the copy:
|
||||||
|
|
||||||
|
- `index.meta` is rewritten with the final genome list (all sources, possibly renamed) and the effective evidence kind.
|
||||||
|
- In `Presence` mode, any `counts/` directories inherited from source_0 are removed.
|
||||||
|
- `spectrums/` from source_0 is removed and rebuilt from scratch across all sources, applying the (possibly renamed) labels.
|
||||||
|
|
||||||
|
This establishes the partition layout, all existing MPHFs, unitigs, and evidence files. The first source's genomes occupy columns 0 … `n_dst_genomes - 1` in the destination.
|
||||||
|
|
||||||
### 3. For each subsequent source (parallel across partitions)
|
### 3. For each subsequent source (parallel across partitions)
|
||||||
|
|
||||||
For each partition, process one source at a time sequentially:
|
`KmerPartition::merge_partition(i, sources, mode, n_dst_genomes, block_bits)` is called for each partition index `i`. `block_bits` is taken from `dst.meta.config.block_bits`.
|
||||||
|
|
||||||
**a. Classify kmers**
|
Each entry in `sources` is `(&KmerPartition, n_genomes)` where `n_genomes` is the column count that source contributes (> 1 when the source is itself a merged index).
|
||||||
|
|
||||||
Iterate all kmers in the source partition (via `UnitigFileReader` + canonical kmer iteration). For each kmer, probe the destination `LayeredMap<()>`:
|
**First merge, Presence mode**: when `n_dst_genomes == 1`, `Layer::<()>::init_presence_matrix` is called on every existing destination layer before any source column is appended. This creates `presence/col_000000.pbiv` set all-true (genome 0 is present in every slot).
|
||||||
|
|
||||||
- **Hit** `(dst_layer, slot)`: record `(dst_layer, slot, value)` where `value` is the source count (Count mode) or `1` (Presence mode).
|
**Pass 1 — classify kmers**
|
||||||
- **Miss**: add kmer to a `GraphDeBruijn` accumulator; record its value in a `HashMap<CanonicalKmer, Vec<u32>>`.
|
|
||||||
|
|
||||||
**b. Extend existing layers**
|
Iterate all kmers from all source partitions (via `UnitigFileReader` + canonical kmer iteration). For each kmer, probe the destination `LayeredMap<()>`:
|
||||||
|
|
||||||
For each existing layer in the destination partition, call `append_genome_column` once per source genome being merged. Slots that received a hit are populated with their recorded value; all other slots receive 0 (absent).
|
- **Hit**: kmer already in the destination; record for Pass 2.
|
||||||
|
- **Miss**: push kmer into a `GraphDeBruijn` accumulator.
|
||||||
|
|
||||||
If this is the **first merge** and operating in Presence mode, call `Layer<()>::init_presence_matrix` before appending any source column. This creates `presence/` with `col_000000.pbiv` set all-true (genome 0 is present in every slot of this layer).
|
**New layer construction**
|
||||||
|
|
||||||
**c. Build new layer for new kmers**
|
If the accumulator is non-empty, compute de Bruijn unitigs and call `Layer::<()>::build(&new_layer_dir, block_bits)`. All kmers absent from the destination — across **all** sources — accumulate into a **single** graph, producing one new layer per merge operation (not one per source).
|
||||||
|
|
||||||
If the `GraphDeBruijn` accumulator is non-empty (misses exist), write compact unitigs from the de Bruijn graph, then call the appropriate `Layer::build` variant. Before appending the source's own column, prepend `n_existing_genomes` absent columns (value 0) — one per genome already in the index — so the column count invariant holds immediately after layer creation.
|
**Pass 2 — fill column builders**
|
||||||
|
|
||||||
**d. Update partition metadata**
|
For each source and each of its layers, re-iterate unitigs and look up stored values via `SrcLayerData::lookup(kmer, src_n)`:
|
||||||
|
|
||||||
Write updated `meta.json` with the incremented `n_layers` if a new layer was added.
|
- `SrcLayerData::SetMembership` — no data directory exists; every kmer returns `vec![1; n_genomes]`
|
||||||
|
- `SrcLayerData::Presence` — reads `PersistentBitMatrix` from `presence/`
|
||||||
|
- `SrcLayerData::Count` — reads `PersistentCompactIntMatrix` from `counts/`
|
||||||
|
|
||||||
|
Hits are routed to `exist_builders[dst_layer][src_col]`; misses are routed to `new_src_builders[src_col]`.
|
||||||
|
|
||||||
|
**Column prepending for new layers**
|
||||||
|
|
||||||
|
Before source columns are written to the new layer, `n_dst_genomes` absent columns (all-zero / all-false) are prepended — one per genome already in the index — so the column count invariant holds immediately after layer creation.
|
||||||
|
|
||||||
|
**Close and update metadata**
|
||||||
|
|
||||||
|
Close all builders; update `presence/meta.json` or `counts/meta.json` with `{"n": N, "n_cols": n_dst_genomes + n_src_total}`; increment `PartitionMeta::n_layers` if a new layer was added.
|
||||||
|
|
||||||
### 4. Update index metadata
|
### 4. Update index metadata
|
||||||
|
|
||||||
Append the merged source's genome label(s) to `index.meta.genomes`. Write updated `index.meta`.
|
`index.meta` was already written during bootstrap with the complete genome list and evidence kind. No further update is needed after the partition loop.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## `append_genome_column`
|
||||||
|
|
||||||
|
Defined on two concrete specialisations of `Layer<D>`:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
impl Layer<PersistentCompactIntMatrix> {
|
||||||
|
pub fn append_genome_column(layer_dir: &Path, value_of: impl Fn(usize) -> u32) -> OLMResult<()>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Layer<PersistentBitMatrix> {
|
||||||
|
pub fn append_genome_column(layer_dir: &Path, value_of: impl Fn(usize) -> bool) -> OLMResult<()>
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Each appends one column file to the matrix subdirectory (`counts/` or `presence/`). In `merge_partition`, columns are written directly via `PersistentBitVecBuilder` / `PersistentCompactIntVecBuilder` rather than through these helpers, but the invariant they enforce is the same.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -78,27 +137,31 @@ Append the merged source's genome label(s) to `index.meta.genomes`. Write update
|
|||||||
|
|
||||||
After any merge, **every layer in every partition has exactly `n_genomes` columns**, where `n_genomes` is the total genome count in the index at that point.
|
After any merge, **every layer in every partition has exactly `n_genomes` columns**, where `n_genomes` is the total genome count in the index at that point.
|
||||||
|
|
||||||
This is maintained by three mechanisms:
|
Maintained by three mechanisms:
|
||||||
|
|
||||||
1. **Existing layers**: one column appended per source genome (`append_genome_column`).
|
1. **Existing layers**: `n_src_total` columns appended (one per source genome).
|
||||||
2. **New layers created during merge**: `n_existing_genomes` absent columns prepended before the source's own column.
|
2. **New layers created during merge**: `n_dst_genomes` absent columns prepended before source columns.
|
||||||
3. **First merge, Presence mode**: `init_presence_matrix` retroactively creates `presence/col_0` all-true for genome 0 before any source column is appended.
|
3. **First merge, Presence mode**: `init_presence_matrix` retroactively creates `presence/col_0` all-true for genome 0.
|
||||||
|
|
||||||
The invariant is a precondition of the `LayeredStore` aggregation traits: `col_weights()` and all partial distance methods assume every inner store has the same column count.
|
The invariant is a precondition of `LayeredStore` aggregation traits: `col_weights()` and all partial distance methods assume every inner store has the same column count.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## New layer construction
|
## Error variants relevant to merge
|
||||||
|
|
||||||
All kmers absent from the destination index — across **all** sources being merged — accumulate into a **single** `GraphDeBruijn` per partition. One new layer is built from this graph, not one layer per source. This keeps the layer count bounded and preserves compact unitig representation.
|
| Variant | Condition |
|
||||||
|
|---|---|
|
||||||
De Bruijn reconstruction ensures that overlapping k-1 suffixes/prefixes from different source kmers are merged into minimal unitigs before MPHF construction.
|
| `OKIError::NotIndexed(path)` | Source not in `Indexed` state |
|
||||||
|
| `OKIError::IncompatibleConfig` | Mismatched `kmer_size`, `minimizer_size`, or `n_partitions` |
|
||||||
|
| `OKIError::MismatchedMode` | Count mode but a source has `with_counts=false` |
|
||||||
|
| `OKIError::IncompatibleEvidence(msg)` | Mixed exact/approx or different approx `(b, z)` |
|
||||||
|
| `OKIError::DuplicateGenomeLabel(label)` | Duplicate label and `rename_duplicates=false` |
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## On-disk impact
|
## On-disk impact
|
||||||
|
|
||||||
After merging `G` genomes (1 existing + G-1 new sources):
|
After merging `G` genomes (sources_0 contributes `G0`, subsequent sources the rest):
|
||||||
|
|
||||||
```
|
```
|
||||||
partitions/
|
partitions/
|
||||||
@@ -106,28 +169,28 @@ partitions/
|
|||||||
index/
|
index/
|
||||||
meta.json ← n_layers updated if new layer added
|
meta.json ← n_layers updated if new layer added
|
||||||
layer_0/
|
layer_0/
|
||||||
mphf.bin ← unchanged (MPHF immutable)
|
mphf.bin ← unchanged
|
||||||
unitigs.bin ← unchanged
|
unitigs.bin ← unchanged
|
||||||
evidence.bin ← unchanged
|
evidence.bin ← unchanged
|
||||||
presence/ ← created on first merge (Presence mode)
|
presence/ ← created on first merge (Presence mode)
|
||||||
meta.json {"n": N, "n_cols": G}
|
meta.json {"n": N, "n_cols": G}
|
||||||
col_000000.pbiv ← all-true (genome 0)
|
col_000000.pbiv ← all-true (genome 0 … G0-1)
|
||||||
col_000001.pbiv ← source 1 presence
|
col_000001.pbiv ← next source
|
||||||
...
|
...
|
||||||
counts/ ← extended (Count mode)
|
counts/ ← extended (Count mode)
|
||||||
meta.json {"n": N, "n_cols": G}
|
meta.json {"n": N, "n_cols": G}
|
||||||
col_000000.pciv ← genome 0 counts (from original build)
|
col_000000.pciv ← genome 0 counts (from original build)
|
||||||
col_000001.pciv ← source 1 counts
|
col_000001.pciv ← next source
|
||||||
...
|
...
|
||||||
layer_1/ ← new layer (if new kmers found)
|
layer_N/ ← new layer (if new kmers found)
|
||||||
mphf.bin
|
mphf.bin
|
||||||
unitigs.bin
|
unitigs.bin
|
||||||
evidence.bin
|
evidence.bin
|
||||||
presence/ or counts/
|
presence/ or counts/
|
||||||
meta.json {"n": N1, "n_cols": G}
|
meta.json {"n": N1, "n_cols": G}
|
||||||
col_000000.pbiv ← all-false (genome 0 absent from this layer)
|
col_000000.pbiv ← all-false (absent for existing genomes)
|
||||||
...
|
...
|
||||||
col_000001.pbiv ← source 1 presence in this new layer
|
spectrums/
|
||||||
|
<label>.json ← one file per genome, rebuilt from all sources
|
||||||
|
index.meta ← complete genome list + evidence kind written at bootstrap
|
||||||
```
|
```
|
||||||
|
|
||||||
The `index.meta` genome list grows from 1 entry to G entries after all sources are merged.
|
|
||||||
|
|||||||
@@ -27,10 +27,10 @@ part_XXXXX/
|
|||||||
|
|
||||||
After filtering (applying a min-count threshold derived from the spectrum) and building the local De Bruijn graph + unitigs (see [Construction pipeline](pipeline.md)), the exact filtered kmer set is available via `unitigs.bin`.
|
After filtering (applying a min-count threshold derived from the spectrum) and building the local De Bruijn graph + unitigs (see [Construction pipeline](pipeline.md)), the exact filtered kmer set is available via `unitigs.bin`.
|
||||||
|
|
||||||
`MphfLayer::build` is called on the unitig file:
|
`MphfLayer::build(dir, block_bits, fill_slot)` is called on the unitig directory:
|
||||||
|
|
||||||
1. **Pass 1**: iterate all canonical kmers from `unitigs.bin` in parallel, build and store `mphf.bin` (ptr_hash).
|
1. **Pass 1**: build `.idx` via `build_unitig_idx(unitig_path, block_bits)`, then iterate all canonical kmers in parallel over chunks using `(0..unitigs.len()).into_par_iter()` + `unitigs.unitig(ci).into_canonical_kmers()`. Constructs and stores `mphf.bin` (ptr_hash, `new_from_par_iter`).
|
||||||
2. **Pass 2**: iterate sequentially, fill `evidence.bin`, call the mode-specific `fill_slot` callback.
|
2. **Pass 2**: iterate sequentially with `iter_indexed_canonical_kmers`; fill `evidence.bin`; call `fill_slot(slot, kmer)` callback once per kmer for DataStore population.
|
||||||
|
|
||||||
`mphf1.bin` and `counts1.bin` are no longer needed after phase 2 and can be deleted.
|
`mphf1.bin` and `counts1.bin` are no longer needed after phase 2 and can be deleted.
|
||||||
|
|
||||||
@@ -105,9 +105,12 @@ Each layer is a self-contained unit. See [obilayeredmap](obilayeredmap.md) for t
|
|||||||
|
|
||||||
```
|
```
|
||||||
layer_i/
|
layer_i/
|
||||||
unitigs.bin — packed 2-bit nucleotide sequences (kmer evidence)
|
unitigs.bin — packed 2-bit nucleotide sequences (kmer evidence source)
|
||||||
|
unitigs.bin.idx — random-access block index (block_bits controls granularity)
|
||||||
mphf.bin — ptr_hash phase-2 MPHF
|
mphf.bin — ptr_hash phase-2 MPHF
|
||||||
evidence.bin — n × u32: (chunk_id: 25 bits | rank: 7 bits) per slot
|
evidence.bin — n × (chunk_id: 25 bits | rank: 7 bits) per slot [exact mode]
|
||||||
|
fingerprint.bin — n × b-bit fingerprints per slot [approx mode]
|
||||||
|
layer_meta.json — evidence kind, recorded at build time
|
||||||
```
|
```
|
||||||
|
|
||||||
Layers are **disjoint**: a canonical kmer belongs to exactly one layer. Layer 0 is built from dataset A. Adding dataset B:
|
Layers are **disjoint**: a canonical kmer belongs to exactly one layer. Layer 0 is built from dataset A. Adding dataset B:
|
||||||
@@ -116,9 +119,45 @@ Layers are **disjoint**: a canonical kmer belongs to exactly one layer. Layer 0
|
|||||||
2. Collect kmers of B not present in any layer → set `B \ A`.
|
2. Collect kmers of B not present in any layer → set `B \ A`.
|
||||||
3. Build layer 1 from `B \ A` (dereplicate → count → De Bruijn → unitigs → `MphfLayer::build`).
|
3. Build layer 1 from `B \ A` (dereplicate → count → De Bruijn → unitigs → `MphfLayer::build`).
|
||||||
|
|
||||||
|
### Evidence modes
|
||||||
|
|
||||||
|
Two evidence modes are supported, selected at build time via `EvidenceKind` and recorded in `layer_meta.json`.
|
||||||
|
|
||||||
|
**Exact** (`EvidenceKind::Exact`): `evidence.bin` stores one `(chunk_id, rank)` pair per MPHF slot, encoding the position of the corresponding kmer in `unitigs.bin`. Membership verification reconstructs the kmer from `(chunk_id, rank)` and compares it to the query. Zero false positives. Requires `.idx` for random access.
|
||||||
|
|
||||||
|
**Approx** (`EvidenceKind::Approx { b, z }`): `fingerprint.bin` stores a `b`-bit hash of the kmer at each MPHF slot. Membership check compares `kmer.seq_hash()` against the stored fingerprint. False-positive rate: 1/2^b per query. No `.idx` is written or needed.
|
||||||
|
|
||||||
|
### Build functions
|
||||||
|
|
||||||
|
```
|
||||||
|
MphfLayer::build(dir, block_bits, fill_slot)
|
||||||
|
Pass 1: par_iter over chunks via .idx → build mphf.bin
|
||||||
|
Pass 2: sequential iter → fill evidence.bin + call fill_slot
|
||||||
|
|
||||||
|
MphfLayer::build_exact_evidence(dir, block_bits)
|
||||||
|
Standalone post-hoc: builds evidence.bin + .idx from existing mphf.bin + unitigs.bin
|
||||||
|
Uses open_sequential(); no .idx required on entry
|
||||||
|
|
||||||
|
MphfLayer::build_approx_evidence(dir, b, z)
|
||||||
|
Standalone post-hoc: builds fingerprint.bin from existing mphf.bin + unitigs.bin
|
||||||
|
Uses open_sequential(); never writes .idx
|
||||||
|
|
||||||
|
MphfLayer::build_evidence(dir, kind, block_bits)
|
||||||
|
Dispatch wrapper: routes to build_exact_evidence or build_approx_evidence
|
||||||
|
```
|
||||||
|
|
||||||
|
`build` always produces exact evidence. If approximate evidence is needed (e.g. `EvidenceKind::Approx`), the caller invokes `build_approx_evidence` after `build` to replace the evidence bundle.
|
||||||
|
|
||||||
|
In `obikpartitionner`, `build_index_layer` receives `block_bits: u8` from `IndexConfig::block_bits` and forwards it directly to `Layer::build` and `Layer::build_approx_evidence`.
|
||||||
|
|
||||||
### Membership verification
|
### Membership verification
|
||||||
|
|
||||||
ptr_hash maps any input to a valid slot — it does not natively detect absent keys. Membership is verified using the evidence entry: decode the kmer from `(chunk_id, rank)` and compare to the query. A mismatch means the kmer is absent from this layer; probe the next layer.
|
ptr_hash maps any input to a valid slot — it does not natively detect absent keys. Membership is verified using the evidence entry:
|
||||||
|
|
||||||
|
- **Exact**: decode `(chunk_id, rank)` from `evidence.bin`; reconstruct the kmer via `unitigs.verify_canonical_kmer`; compare to query.
|
||||||
|
- **Approx**: compare `kmer.seq_hash()` to the b-bit fingerprint stored at the slot.
|
||||||
|
|
||||||
|
A mismatch in either mode means the kmer is absent from this layer; probe the next layer.
|
||||||
|
|
||||||
### Query algorithm
|
### Query algorithm
|
||||||
|
|
||||||
@@ -126,12 +165,12 @@ ptr_hash maps any input to a valid slot — it does not natively detect absent k
|
|||||||
fn query(kmer) → Option<(layer_index, slot)>:
|
fn query(kmer) → Option<(layer_index, slot)>:
|
||||||
for (i, layer) in layers.iter().enumerate():
|
for (i, layer) in layers.iter().enumerate():
|
||||||
slot = layer.mphf.index(kmer)
|
slot = layer.mphf.index(kmer)
|
||||||
if layer.evidence.decode(slot) matches kmer:
|
if layer.evidence.matches(slot, kmer): // exact or approx dispatch
|
||||||
return Some((i, slot))
|
return Some((i, slot))
|
||||||
return None
|
return None
|
||||||
```
|
```
|
||||||
|
|
||||||
Expected probe depth: 1 for kmers in layer 0. Each probe is a ptr_hash lookup (~10 ns) plus one evidence decode.
|
`MphfLayer::find` dispatches transparently to `find_exact` or `find_approx` based on the evidence loaded at `open` time. Expected probe depth: 1 for kmers in layer 0. Each probe is a ptr_hash lookup (~10 ns) plus one evidence check.
|
||||||
|
|
||||||
### Merging layers
|
### Merging layers
|
||||||
|
|
||||||
|
|||||||
@@ -1,16 +1,26 @@
|
|||||||
# obipipeline — parallel pipeline library
|
# obipipeline — parallel pipeline library
|
||||||
|
|
||||||
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **transforms**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
|
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **stages**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
|
||||||
|
|
||||||
## Core types
|
## Core types
|
||||||
|
|
||||||
| Type alias | Rust type | Role |
|
| Type alias | Rust type | Role |
|
||||||
|---|---|---|
|
|---|---|---|
|
||||||
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send+Sync>` | Called repeatedly; `FnMut` because it holds iterator state |
|
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send>` | Called repeatedly; `FnMut` because it holds iterator state |
|
||||||
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send+Sync>` | Shared across workers via `Arc::clone` (no copy of the closure) |
|
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send + Sync>` | 1→1 transform shared across workers via `Arc::clone` |
|
||||||
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send+Sync>` | Final consumer; returns `Result` so errors propagate back |
|
| `SharedFlatFn<D>` | `Arc<dyn Fn(D, &Sender<Result<D, _>>, &Sender<isize>) + Send + Sync>` | 1→N transform; pushes items into channel, sends delta |
|
||||||
|
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send>` | Final consumer; returns `Result` so errors propagate back |
|
||||||
|
|
||||||
`Pipeline<D>` holds one `SourceFn`, a `Vec<SharedFn>`, and one `SinkFn`.
|
Stages come in two variants:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
pub enum Stage<D> {
|
||||||
|
Transform(SharedFn<D>), // 1→1
|
||||||
|
Flat(SharedFlatFn<D>), // 1→N
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
`Pipeline<D>` holds one `SourceFn`, a `Vec<Stage>`, and one `SinkFn`.
|
||||||
`WorkerPool<D>` wraps a `Pipeline` with `n_workers` and channel `capacity`.
|
`WorkerPool<D>` wraps a `Pipeline` with `n_workers` and channel `capacity`.
|
||||||
|
|
||||||
## WorkerPool
|
## WorkerPool
|
||||||
@@ -23,7 +33,7 @@ WorkerPool::run(self)
|
|||||||
| Parameter | Role |
|
| Parameter | Role |
|
||||||
|---|---|
|
|---|---|
|
||||||
| `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. |
|
| `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. |
|
||||||
| `capacity` | Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
|
| `capacity` | Bound on every crossbeam channel in the pipeline. Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
|
||||||
|
|
||||||
`run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.
|
`run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.
|
||||||
|
|
||||||
@@ -43,7 +53,7 @@ Each variant carries the concrete type for one stage's output. The macros patter
|
|||||||
|
|
||||||
## Macros
|
## Macros
|
||||||
|
|
||||||
Six low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
|
Eight low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
|
||||||
|
|
||||||
### Low-level
|
### Low-level
|
||||||
|
|
||||||
@@ -54,6 +64,9 @@ make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T
|
|||||||
make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U
|
make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U
|
||||||
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>
|
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>
|
||||||
|
|
||||||
|
make_flat_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> impl IntoIterator<Item=U>
|
||||||
|
make_flat_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<impl IntoIterator<Item=U>, E>
|
||||||
|
|
||||||
make_sink!(Enum, func, InputVariant) // func: T -> ()
|
make_sink!(Enum, func, InputVariant) // func: T -> ()
|
||||||
make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E>
|
make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E>
|
||||||
```
|
```
|
||||||
@@ -66,8 +79,10 @@ Each macro wraps the closure in the correct smart pointer (`Box` for source/sink
|
|||||||
make_pipeline! {
|
make_pipeline! {
|
||||||
DataEnum,
|
DataEnum,
|
||||||
source iterator => OutputVariant, // or source? for fallible
|
source iterator => OutputVariant, // or source? for fallible
|
||||||
| func: In => Out, // non-fallible transform
|
| func: In => Out, // 1→1 non-fallible transform
|
||||||
|? func: In => Out, // fallible transform
|
|? func: In => Out, // 1→1 fallible transform
|
||||||
|
|| func: In => Out, // 1→N non-fallible flat transform
|
||||||
|
||? func: In => Out, // 1→N fallible flat transform
|
||||||
sink func @ InputVariant, // or sink? for fallible
|
sink func @ InputVariant, // or sink? for fallible
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@@ -75,12 +90,29 @@ make_pipeline! {
|
|||||||
`?` marks fallibility on source, individual transforms, or sink independently.
|
`?` marks fallibility on source, individual transforms, or sink independently.
|
||||||
Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`.
|
Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`.
|
||||||
|
|
||||||
|
### make_pipe! DSL
|
||||||
|
|
||||||
|
`make_pipe!` builds a sourceless/sinkless `Pipe<D, In, Out>` — a reusable, composable stage sequence:
|
||||||
|
|
||||||
|
```
|
||||||
|
make_pipe! {
|
||||||
|
DataEnum : InType => OutType,
|
||||||
|
| func: InVariant => OutVariant,
|
||||||
|
|? func: InVariant => OutVariant,
|
||||||
|
|| func: InVariant => OutVariant,
|
||||||
|
||? func: InVariant => OutVariant,
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Two pipes compose with `.then(other)`. Apply to an iterator with `.apply(iter, n_workers, capacity)` to get a `PipeIter<Out>` — an iterator over the pipeline output, backed by a background `WorkerPool`. The scatter step in `obikmer` uses `make_pipe!` and `.apply()` rather than the full `make_pipeline!` / `WorkerPool` pattern.
|
||||||
|
|
||||||
## Scheduler architecture
|
## Scheduler architecture
|
||||||
|
|
||||||
```
|
```
|
||||||
Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
|
Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
|
||||||
▲ │
|
▲ │
|
||||||
[stage_rxs] ────────┘◄──────────────────────────────┘
|
[stage_rxs] ────────┘◄──────────────────────────────┘
|
||||||
|
[flat_delta_rx] ──► Scheduler (in_flight adjustment)
|
||||||
│
|
│
|
||||||
[sink_err_rx] ← errors from sink (highest priority)
|
[sink_err_rx] ← errors from sink (highest priority)
|
||||||
│
|
│
|
||||||
@@ -91,23 +123,23 @@ The scheduler is a single thread running a biased `Select` over all input channe
|
|||||||
|
|
||||||
```
|
```
|
||||||
index 0 sink_err_rx abort on sink error
|
index 0 sink_err_rx abort on sink error
|
||||||
index 1 stage_rxs[N-1] drain last stage first
|
index 1 flat_delta_rx adjust in_flight before dispatching
|
||||||
...
|
index 2..=n+1 stage_rxs[n-1..0] drain last stage first
|
||||||
index N stage_rxs[0]
|
index n+2 source_rx pull new data last
|
||||||
index N+1 source_rx pull new data last
|
|
||||||
```
|
```
|
||||||
|
|
||||||
This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.
|
This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.
|
||||||
|
|
||||||
**Workers** are generic: each receives `(data, SharedFn, result_tx)` and calls `f(data)`, sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result.
|
**Workers** are generic: each receives a `WorkerTask` — either `Transform(data, stage_idx)` or `Flat(data, stage_idx)`. For `Transform`, the worker calls `f(data)` and sends the result to `stage_txs[stage_idx]`. For `Flat`, the worker calls `f(data, &push_tx, &delta_tx)`: the closure pushes N items into `push_tx` then sends `N-1` to `delta_tx`. The scheduler uses the delta to adjust `in_flight` without knowing N in advance.
|
||||||
|
|
||||||
**Termination** uses an `in_flight` counter:
|
**Termination** uses an `in_flight: isize` counter and a `flat_workers_active: usize` counter:
|
||||||
|
|
||||||
- incremented when an item is dispatched from source to workers
|
- `in_flight` incremented when an item is dispatched from source to workers
|
||||||
- decremented when the item exits the last stage
|
- `in_flight` decremented when the item exits the last stage to the sink
|
||||||
- the loop exits only when `source_done && in_flight == 0`
|
- `flat_workers_active` incremented when a `Flat` task is dispatched, decremented when the delta arrives
|
||||||
|
- the loop exits only when `source_done && in_flight == 0 && flat_workers_active == 0`
|
||||||
|
|
||||||
This guarantees all in-flight items complete before `join()`.
|
This guarantees all in-flight items complete (including all N outputs of a flat stage) before `join()`.
|
||||||
|
|
||||||
## Error handling
|
## Error handling
|
||||||
|
|
||||||
@@ -118,7 +150,7 @@ This guarantees all in-flight items complete before `join()`.
|
|||||||
| `EndOfStream` | Source exhausted (normal termination, not sent downstream) |
|
| `EndOfStream` | Source exhausted (normal termination, not sent downstream) |
|
||||||
| `TypeMismatch` | Wrong enum variant arrived at a stage |
|
| `TypeMismatch` | Wrong enum variant arrived at a stage |
|
||||||
| `StepKindMismatch` | Internal routing error |
|
| `StepKindMismatch` | Internal routing error |
|
||||||
| `StepError(Box<dyn Error>)` | Error from user code (wrapped by `make_*_fallible!`) |
|
| `StepError(Box<dyn Error + Send + Sync>)` | Error from user code (wrapped by `make_*_fallible!`) |
|
||||||
|
|
||||||
Sink errors flow back to the scheduler via a dedicated `Receiver<PipelineError>` registered at index 0 of the Select — the pipeline stops immediately on the first sink error.
|
Sink errors flow back to the scheduler via a dedicated `Receiver<PipelineError>` registered at index 0 of the Select — the pipeline stops immediately on the first sink error.
|
||||||
|
|
||||||
|
|||||||
@@ -88,9 +88,7 @@ Implemented as a three-step pipeline in `count_partition()`:
|
|||||||
2. **Provisional MPHF** (ptr_hash): built from `sorted_unique.bin` via `new_from_par_iter(f0, ...)`. Stored to `mphf1.bin`; `sorted_unique.bin` deleted immediately.
|
2. **Provisional MPHF** (ptr_hash): built from `sorted_unique.bin` via `new_from_par_iter(f0, ...)`. Stored to `mphf1.bin`; `sorted_unique.bin` deleted immediately.
|
||||||
3. **Accumulation pass**: re-read dereplicated superkmers; for each kmer, `slot = mphf.index(kmer.raw())`, increment `counts1[slot]` by the superkmer COUNT. Stored in a `PersistentCompactIntVec` (`counts1.bin`).
|
3. **Accumulation pass**: re-read dereplicated superkmers; for each kmer, `slot = mphf.index(kmer.raw())`, increment `counts1[slot]` by the superkmer COUNT. Stored in a `PersistentCompactIntVec` (`counts1.bin`).
|
||||||
|
|
||||||
At the end of this phase, each distinct canonical kmer has its exact total count, and the frequency spectrum (`kmer_spectrum_raw.json`) is written.
|
At the end of this phase, each distinct canonical kmer has its exact total count, and the frequency spectrum (`spectrums/{label}.json`) is written to the index root.
|
||||||
|
|
||||||
Abundance filter applied here: kmers with `total_count < q` are discarded. `q` is a collection parameter (0 = keep all, including singletons for ≤1x data).
|
|
||||||
|
|
||||||
No pre-filter on super-kmer COUNT is possible at phase 2: a super-kmer with COUNT=1 may contain only high-abundance kmers, each present in many other super-kmers across the partition.
|
No pre-filter on super-kmer COUNT is possible at phase 2: a super-kmer with COUNT=1 may contain only high-abundance kmers, each present in many other super-kmers across the partition.
|
||||||
|
|
||||||
@@ -140,19 +138,70 @@ Output: `unitigs.bin` — the permanent evidence structure for the partition. Ea
|
|||||||
|
|
||||||
## Phase 6 — MPHF construction and index finalisation
|
## Phase 6 — MPHF construction and index finalisation
|
||||||
|
|
||||||
Built once on the definitive kmer set (all kmers in all unitigs of the partition). See [obilayeredmap](obilayeredmap.md) and [MPHF selection](mphf.md) for the current implementation.
|
`build_index_layer` is called per partition (in parallel via `build_layers`) with the following parameters sourced from `IndexConfig`:
|
||||||
|
|
||||||
|
- `block_bits` — from `IndexConfig::block_bits`; controls the `.idx` block size (2^block_bits unitig chunks per block) for exact evidence
|
||||||
|
- `evidence` — `EvidenceKind::Exact` or `EvidenceKind::Approx { b, z }`; propagated unchanged from `IndexConfig::evidence`
|
||||||
|
- `min_ab` / `max_ab` — abundance bounds applied before graph construction
|
||||||
|
- `with_counts` — whether to store kmer counts alongside set membership
|
||||||
|
|
||||||
|
**Abundance filtering:** when `min_ab > 1` or `max_ab.is_some()`, the provisional `mphf1.bin` and `counts1.bin` produced in phase 3 are memory-mapped. Each canonical kmer is accepted only if its count in `counts1` satisfies the bounds. If either file is absent, filtering is skipped (all kmers accepted).
|
||||||
|
|
||||||
```
|
```
|
||||||
kmers from unitigs → MPHF → mphf.bin
|
for each kmer in dereplicated super-kmer:
|
||||||
→ evidence.bin : n × u32, each = (chunk_id: 25 bits | rank: 7 bits)
|
ab = counts1[mphf1.index(kmer.raw())]
|
||||||
→ payload : counts/ (mode 2) or presence/ (mode 3)
|
if ab < min_ab || ab > max_ab: skip
|
||||||
|
graph.push(kmer)
|
||||||
```
|
```
|
||||||
|
|
||||||
The MPHF is built in two passes over `unitigs.bin`: parallel pass for `mphf.bin`, sequential pass for `evidence.bin` and payload. The exact kmer count is available from the unitig index (`unitigs.bin.idx`) before the passes begin.
|
**Graph build and unitig write:**
|
||||||
|
|
||||||
**Exact verification via unitig evidence:**
|
The surviving kmers are fed into `GraphDeBruijn`, which computes degrees and yields unitigs. Unitigs are written to `layer_0/unitigs.bin` via a `UnitigFileWriter`.
|
||||||
|
|
||||||
`unitigs.bin` serves as the evidence structure. The MPHF maps every input to `[0, N)` including absent kmers — the unitig read-back (via `evidence.bin`) is the only correct membership test.
|
**MPHF and evidence build:**
|
||||||
|
|
||||||
|
`Layer::build` (membership-only) or `Layer::<PersistentCompactIntMatrix>::build` (with counts) is called next. Internally, `MphfLayer::build` performs two passes:
|
||||||
|
|
||||||
|
1. **Pass 1 (parallel):** build `unitigs.bin.idx` (block size = 2^`block_bits`) then construct the MPHF from all canonical kmers in `unitigs.bin`; store to `mphf.bin`.
|
||||||
|
2. **Pass 2 (sequential):** for each kmer in `unitigs.bin`, compute its slot and write `evidence.bin` (`chunk_id: 25 bits | rank: 7 bits` packed into a `u32`); also invoke the payload callback (`fill_slot`) to populate `counts/` if `with_counts`.
|
||||||
|
|
||||||
|
After `Layer::build` completes, `layer_meta.json` records `EvidenceKind::Exact`.
|
||||||
|
|
||||||
|
**Approximate evidence override:**
|
||||||
|
|
||||||
|
If `evidence` is `EvidenceKind::Approx { b, z }`, `build_approx_evidence` is called immediately after `Layer::build`. It overwrites the exact evidence bundle with `fingerprint.bin` (b-bit hash per slot) and rewrites `layer_meta.json` with `EvidenceKind::Approx { b, z }`. No `.idx` file is needed at query time in this mode.
|
||||||
|
|
||||||
|
```
|
||||||
|
// Exact path → evidence.bin + unitigs.bin.idx + layer_meta.json(Exact)
|
||||||
|
// Approx path → fingerprint.bin + layer_meta.json(Approx{b,z})
|
||||||
|
// (evidence.bin left on disk but not used)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Partition metadata:**
|
||||||
|
|
||||||
|
After all layer files are written, `PartitionMeta { n_layers: 1 }` is serialised to `index/meta.json` inside the partition directory. This file is required by `LayeredMap::open` for subsequent merge operations.
|
||||||
|
|
||||||
|
**File layout per partition after phase 6:**
|
||||||
|
|
||||||
|
```
|
||||||
|
part_XXXXX/
|
||||||
|
index/
|
||||||
|
meta.json ← PartitionMeta { n_layers: 1 }
|
||||||
|
layer_0/
|
||||||
|
unitigs.bin ← permanent evidence (all modes)
|
||||||
|
unitigs.bin.idx ← block index (exact mode only)
|
||||||
|
mphf.bin ← MPHF
|
||||||
|
evidence.bin ← exact evidence (exact mode)
|
||||||
|
fingerprint.bin ← b-bit fingerprints (approx mode)
|
||||||
|
layer_meta.json ← EvidenceKind tag
|
||||||
|
counts/ ← PersistentCompactIntMatrix (with_counts only)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Cleanup:** unless `--keep-intermediate` is set, `remove_build_artifacts` deletes `dereplicated.skmer.zst`, `mphf1.bin`, and `counts1.bin` after all partitions are indexed.
|
||||||
|
|
||||||
|
See [obilayeredmap](obilayeredmap.md) and [MPHF selection](mphf.md) for data structure details.
|
||||||
|
|
||||||
|
**Query path (exact evidence):**
|
||||||
|
|
||||||
```
|
```
|
||||||
query kmer q
|
query kmer q
|
||||||
@@ -164,4 +213,12 @@ query kmer q
|
|||||||
→ no match: kmer absent ← MPHF collision on absent kmer
|
→ no match: kmer absent ← MPHF collision on absent kmer
|
||||||
```
|
```
|
||||||
|
|
||||||
`superkmers.bin.gz` is no longer needed at this point and can be deleted.
|
**Query path (approximate evidence):**
|
||||||
|
|
||||||
|
```
|
||||||
|
query kmer q
|
||||||
|
→ MPHF(q) → slot s
|
||||||
|
→ fingerprint[s] matches seq_hash(q)?
|
||||||
|
→ yes : probable hit (FP rate = 1/2^b per kmer, 1/2^(b·z) per z-window)
|
||||||
|
→ no : kmer absent
|
||||||
|
```
|
||||||
|
|||||||
@@ -2,36 +2,47 @@
|
|||||||
|
|
||||||
## Memory layout
|
## Memory layout
|
||||||
|
|
||||||
A super-kmer is stored as a **32-bit header** followed by a **byte-aligned nucleotide sequence** (2 bits/base, nucleotide 0 at the MSB of the first byte):
|
`SuperKmer` holds two separate fields:
|
||||||
|
|
||||||
| Field | Bits | Role |
|
|
||||||
|-------|------|------|
|
|
||||||
| COUNT | 24 | Occurrence count (≤ 16 M) |
|
|
||||||
| NKMERS | 8 | Number of kmers (= seq_length − k + 1, range 1–255) |
|
|
||||||
|
|
||||||
Bit layout (MSB to LSB): `[31:8] COUNT [7:0] NKMERS`
|
|
||||||
|
|
||||||
NKMERS is stored as a raw `u8` in **kmer units**, not nucleotides. The nucleotide length is recovered as `NKMERS + k − 1`. This avoids the awkward wrapping convention (`0 = 256`) that would be needed if nucleotide length were stored directly, and gains k−1 = 30 units of headroom:
|
|
||||||
|
|
||||||
| unit | u8 covers | max nucleotides |
|
|
||||||
|---|---|---|
|
|
||||||
| nucleotides | 255 nt | 225 kmers |
|
|
||||||
| **kmers** | **255 kmers** | **285 nt** |
|
|
||||||
|
|
||||||
The public accessors:
|
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
fn n_kmers(&self) -> usize { (self.0 & 0xFF) as usize }
|
pub struct SuperKmer {
|
||||||
fn seql(&self) -> usize { self.n_kmers() + K - 1 }
|
pub(crate) count: u32,
|
||||||
fn count(&self) -> u32 { self.0 >> 8 }
|
pub(crate) inner: PackedSeq,
|
||||||
fn increment(&mut self) { self.0 += 1 << 8; }
|
}
|
||||||
fn add(&mut self, n: u32) { self.0 += n << 8; }
|
|
||||||
fn set_count(&mut self, n: u32) { self.0 = (self.0 & 0xFF) | (n << 8); }
|
|
||||||
```
|
```
|
||||||
|
|
||||||
In practice, observed super-kmer lengths on metagenomic data (k=31) are below 55 nucleotides (≤ 25 kmers) — far from the 255-kmer cap. If a super-kmer ever exceeds 255 kmers, it is split with a k−1 nucleotide overlap, preserving all kmers without duplication (identical mechanism to partition-boundary splits).
|
`PackedSeq` stores a 2-bit packed DNA sequence as a heap-allocated `Box<[u8]>` plus a `tail: u8` field:
|
||||||
|
|
||||||
The sequence is always stored in canonical form (lexicographic minimum of forward and reverse complement), with nucleotide 0 at the MSB of the first byte. The byte array can be hashed directly without any adjustment.
|
| Field | Type | Role |
|
||||||
|
|-------|------|------|
|
||||||
|
| `tail` | `u8` | Number of valid nucleotides in the last byte: 0 encodes 4, 1–3 are identity |
|
||||||
|
| `seq` | `Box<[u8]>` | 2-bit packed bytes, nucleotide 0 at bits 7–6 of `seq[0]` |
|
||||||
|
|
||||||
|
Nucleotide length is recovered without storing it explicitly:
|
||||||
|
|
||||||
|
```text
|
||||||
|
seql = (seq.len() - 1) * 4 + tail_count(tail)
|
||||||
|
```
|
||||||
|
|
||||||
|
There is no packed header word — `count` and the sequence live in separate fields.
|
||||||
|
|
||||||
|
The on-disk binary format (produced by `write_to_binary`) is:
|
||||||
|
|
||||||
|
```text
|
||||||
|
[varint(count)] [u8: seql − k] [packed bytes…]
|
||||||
|
```
|
||||||
|
|
||||||
|
`seql − k` fits in a `u8` when `n_kmers = seql − k + 1 ≤ MAX_KMERS_PER_CHUNK (= 256)`. If a super-kmer exceeds 256 kmers, `write_to_binary` splits it into overlapping chunks (k−1 nucleotide overlap, same count per chunk), each a self-contained record readable by `read_from_binary`.
|
||||||
|
|
||||||
|
The public accessors operate on the struct fields directly:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
fn seql(&self) -> usize { self.inner.seql() }
|
||||||
|
fn count(&self) -> u32 { self.count }
|
||||||
|
fn increment(&mut self) { self.count += 1; }
|
||||||
|
fn add(&mut self, n: u32) { self.count += n; }
|
||||||
|
fn set_count(&mut self, n: u32) { self.count = n; }
|
||||||
|
```
|
||||||
|
|
||||||
## ASCII encoding and decoding
|
## ASCII encoding and decoding
|
||||||
|
|
||||||
@@ -72,7 +83,7 @@ const fn revcomp4(x: u8) -> u8 {
|
|||||||
**Step 2 — realignment.** After step 1, `padding = n × 8 − seql × 2` spurious bits (complements of the original padding A's) appear at the start of the array. They are flushed left using `BitSlice<u8, Msb0>::rotate_left(padding)` from the `bitvec` crate, which is SIMD-accelerated. The trailing `padding` bits are then zeroed:
|
**Step 2 — realignment.** After step 1, `padding = n × 8 − seql × 2` spurious bits (complements of the original padding A's) appear at the start of the array. They are flushed left using `BitSlice<u8, Msb0>::rotate_left(padding)` from the `bitvec` crate, which is SIMD-accelerated. The trailing `padding` bits are then zeroed:
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
let seql = self.n_kmers() + k - 1;
|
let seql = self.seql();
|
||||||
shift = n * 8 - seql * 2 // number of padding bits
|
shift = n * 8 - seql * 2 // number of padding bits
|
||||||
bits.rotate_left(shift)
|
bits.rotate_left(shift)
|
||||||
bits[len - shift..].fill(false)
|
bits[len - shift..].fill(false)
|
||||||
@@ -93,7 +104,7 @@ bits[len - shift..].fill(false)
|
|||||||
|
|
||||||
## Minimizer sliding window
|
## Minimizer sliding window
|
||||||
|
|
||||||
Super-kmers are built by `SuperKmerIter` (crate `obiskbuilder`), which maintains the current minimizer with a **monotonic deque** over a sliding window of W = k − m + 1 m-mer positions.
|
Super-kmers are built by `SuperKmerIter` (crate `obiskbuilder`), which tracks the current minimizer with a **monotonic deque** (`Ring<MmerItem, 32>`) inside `RollingStat`, a rolling-window entropy and minimizer tracker.
|
||||||
|
|
||||||
Each deque entry stores:
|
Each deque entry stores:
|
||||||
|
|
||||||
@@ -101,20 +112,9 @@ Each deque entry stores:
|
|||||||
|------------|-------|----------------------------------------------|
|
|------------|-------|----------------------------------------------|
|
||||||
| `position` | usize | 0-based start of this m-mer in the segment |
|
| `position` | usize | 0-based start of this m-mer in the segment |
|
||||||
| `canonical`| u64 | right-aligned canonical m-mer value (lex-min of fwd and rc); used as partition key |
|
| `canonical`| u64 | right-aligned canonical m-mer value (lex-min of fwd and rc); used as partition key |
|
||||||
| `hash` | u64 | $H(\text{canonical})$ — ordering key for random minimizer selection |
|
| `hash` | u64 | `hash_kmer(canonical << (64 − 2m))` — ordering key for random minimizer selection |
|
||||||
|
|
||||||
The hash $H$ is the seeded splitmix64 finalizer (see [Minimizer selection](../theory/minimizer.md)):
|
The hash uses the seeded splitmix64 finalizer (`mix64(raw ^ 0x9e3779b97f4a7c15)`), the same function as `kmer::hash_kmer`.
|
||||||
|
|
||||||
```rust
|
|
||||||
fn hash_mmer(canonical: u64) -> u64 {
|
|
||||||
let x = canonical ^ 0x9e3779b97f4a7c15; // seed: eliminates fixed point at 0
|
|
||||||
let x = x ^ (x >> 30);
|
|
||||||
let x = x.wrapping_mul(0xbf58476d1ce4e5b9);
|
|
||||||
let x = x ^ (x >> 27);
|
|
||||||
let x = x.wrapping_mul(0x94d049bb133111eb);
|
|
||||||
x ^ (x >> 31)
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
On each new nucleotide, once the window is full, the deque is updated:
|
On each new nucleotide, once the window is full, the deque is updated:
|
||||||
|
|
||||||
@@ -133,24 +133,27 @@ On each new nucleotide, once the window is full, the deque is updated:
|
|||||||
|
|
||||||
The front of the deque is always the current minimizer. Because the deque is maintained in strictly increasing hash order, each entry is popped at most once — O(1) amortized per nucleotide.
|
The front of the deque is always the current minimizer. Because the deque is maintained in strictly increasing hash order, each entry is popped at most once — O(1) amortized per nucleotide.
|
||||||
|
|
||||||
A super-kmer boundary is emitted when the minimizer changes: `deque.front.hash ≠ prev_hash`. The `canonical` field of the front entry is **not** used for boundary detection — that uses the hash alone. The canonical value is stored so that the partition key $H(\text{canonical})$ can be recomputed independently at routing time from the stored `minimizer_pos`, without inheriting the minimum-order-statistic bias (see [Minimizer selection — partition key independence](../theory/minimizer.md#partition-key-independence)).
|
A super-kmer boundary is emitted when the minimizer changes: `current_minimizer != prev_minimizer`. `SuperKmerIter` also emits a boundary when:
|
||||||
|
|
||||||
|
- entropy of the current k-mer falls at or below the threshold θ (cursor retreated by k−1)
|
||||||
|
- super-kmer length reaches 256 nucleotides (cursor retreated by k)
|
||||||
|
|
||||||
## Kmer extraction
|
## Kmer extraction
|
||||||
|
|
||||||
A k-mer is extracted from a super-kmer with `SuperKmer::kmer(i, k)`, which returns a `Kmer` — a left-aligned `u64` newtype (see [Kmer implementation](kmer.md)):
|
A k-mer is extracted from a super-kmer with `SuperKmer::kmer(i)`, which delegates to `PackedSeq::extract::<KLen>(i)` and returns a `Kmer` — a left-aligned `u64` newtype (see [Kmer implementation](kmer.md)):
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
pub fn kmer(&self, i: usize, k: usize) -> Result<Kmer, KmerError>
|
pub fn kmer(&self, i: usize) -> Result<Kmer, KmerError>
|
||||||
```
|
```
|
||||||
|
|
||||||
The bit slice `seq[i*2 .. (i+k)*2]` (Msb0 order) is loaded as a big-endian `u64` via `bitvec::load_be`, then left-shifted to produce the canonical left-aligned layout. One call — no loop, no allocation.
|
The bit slice `seq[i*2 .. (i+k)*2]` (Msb0 order) is loaded as a `u64` via `bitvec::load_be`, then left-shifted to produce the canonical left-aligned layout. One call — no loop, no allocation.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
!!! abstract "Algorithm — Super-kmer reverse complement"
|
!!! abstract "Algorithm — Super-kmer reverse complement"
|
||||||
```text
|
```text
|
||||||
procedure SuperKmerRevcomp(seq, SEQL):
|
procedure SuperKmerRevcomp(seq, SEQL):
|
||||||
seql ← NKMERS + k − 1 -- nucleotide length
|
seql ← nucleotide length
|
||||||
n ← ⌈seql / 4⌉ -- number of bytes
|
n ← ⌈seql / 4⌉ -- number of bytes
|
||||||
shift ← n × 8 − seql × 2 -- padding bits to flush
|
shift ← n × 8 − seql × 2 -- padding bits to flush
|
||||||
|
|
||||||
|
|||||||
@@ -174,7 +174,8 @@ Evidence cost decreases by 1 bit/kmer with each doubling of partition count (via
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Open questions
|
## Alternative: fingerprint evidence
|
||||||
|
|
||||||
- **Cross-partition set operations**: strategy B allows unitig-level operations (mark entire chunks present/absent) rather than kmer-level, reducing cost by a factor of m_u.
|
`evidence.bin` can be replaced by `fingerprint.bin` at index build time (`--approx`) or after the fact (`reindex --approx`). The fingerprint stores b bits per MPHF slot (the low b bits of `kmer.seq_hash()`); verification becomes a single bitfield comparison instead of a unitig dereference. False-positive rate per k-mer query: 1/2^b. With the Findere z parameter, z consecutive k-mers must all match, reducing the effective window FP rate to 1/2^(b·z) while skipping z−1 of every z k-mers. No `.idx` file is written or read in approx mode.
|
||||||
- **Eliminating evidence.bin**: at ~66% of per-layer lookup footprint, `evidence.bin` dominates index size. See [Evidence elimination design discussion](evidence_elimination.md).
|
|
||||||
|
See [Approximate evidence (Findere fingerprint)](evidence_elimination.md) for the full design and CLI parameters.
|
||||||
|
|||||||
@@ -2,10 +2,28 @@
|
|||||||
|
|
||||||
`obikmer` is a Rust tool for manipulation, counting, indexing, and set operations on DNA sequences represented as kmer sets.
|
`obikmer` is a Rust tool for manipulation, counting, indexing, and set operations on DNA sequences represented as kmer sets.
|
||||||
|
|
||||||
|
## Subcommands
|
||||||
|
|
||||||
|
| Subcommand | Purpose |
|
||||||
|
|-------------|---------|
|
||||||
|
| `superkmer` | Extract super-kmers from a sequence file and write to stdout |
|
||||||
|
| `index` | Build a complete genome index (scatter → dereplicate → count → layered MPHF) |
|
||||||
|
| `merge` | Merge multiple built indexes into one |
|
||||||
|
| `rebuild` | Filter and compact an existing index into a new single-layer index |
|
||||||
|
| `query` | Query an index with sequences and annotate matches |
|
||||||
|
| `dump` | Dump all indexed kmers as CSV (kmer + per-genome counts or presence) |
|
||||||
|
| `annotate` | Add or update genome metadata from a CSV file; or dump metadata as CSV |
|
||||||
|
| `distance` | Compute pairwise distance matrix between genomes; optionally build NJ/UPGMA trees |
|
||||||
|
| `unitig` | Dump unitigs from a built index to stdout (debug) |
|
||||||
|
| `estimate` | Estimate approximate-index parameters (z, evidence bits, FP rates) before indexing |
|
||||||
|
| `reindex` | Convert an index's evidence in-place: exact ↔ approx |
|
||||||
|
|
||||||
## Constraints
|
## Constraints
|
||||||
|
|
||||||
- Target scale: individual genome datasets, tens of Gbases
|
- Target scale: individual genome datasets, tens of Gbases
|
||||||
- Maximum efficiency in computation, memory, and disk usage
|
- Maximum efficiency in computation, memory, and disk usage
|
||||||
|
- k odd, k ∈ [11, 31], fixed at runtime; kmer fits in a u64 (2 bits/base)
|
||||||
|
- Canonical form: `min(kmer, revcomp(kmer))` reduces strand-symmetric space by half
|
||||||
- Input formats: FASTA, FASTQ, gzip, streaming stdin
|
- Input formats: FASTA, FASTQ, gzip, streaming stdin
|
||||||
|
|
||||||
## Priority operations
|
## Priority operations
|
||||||
|
|||||||
+1
-1
@@ -9,7 +9,7 @@ A **kmer** is a DNA subsequence of fixed length k. Two constraints govern the ch
|
|||||||
|
|
||||||
## Super-kmers
|
## Super-kmers
|
||||||
|
|
||||||
A **super-kmer** is a maximal run of consecutive kmers from a DNA read, each overlapping the next by k−1 nucleotides. Each kmer of the run carries the same **canonical minimizer**. The **canonical minimizer** of a kmer is the smallest value of `min(m-mer, revcomp(m-mer))` over all m-mers within the kmer (m < k, m odd), with the constraint that **non-degenerate m-mers are always preferred** over degenerate ones. A degenerate m-mer is one composed of a single repeated nucleotide (all-A, all-C, all-G, or all-T); such m-mers are selected only if no non-degenerate candidate exists in the window.
|
A **super-kmer** is a maximal run of consecutive kmers from a DNA read, each overlapping the next by k−1 nucleotides, sharing the same **canonical minimizer**. The **canonical minimizer** of a kmer is the m-mer (m < k) whose canonical hash `hash_kmer(min(m-mer, revcomp(m-mer)))` is smallest over all m-mers in the kmer window. The hash function is a `mix64`-based bijection; selection is purely hash-ordered with no degeneracy filter. A super-kmer is capped at 256 nucleotides; a longer run is split at that boundary.
|
||||||
|
|
||||||
### Canonical super-kmers
|
### Canonical super-kmers
|
||||||
|
|
||||||
|
|||||||
@@ -17,18 +17,22 @@ The Watson-Crick complement of any base is its bitwise NOT on 2 bits: `complemen
|
|||||||
|
|
||||||
A kmer fits in a single `u64`. Nucleotide 0 occupies bits 63–62, nucleotide i occupies bits 63−2i and 62−2i, and the low 64−2k bits are zero. Extraction of nucleotide i (0 ≤ i < k): `(kmer >> (62 - 2*i)) & 0b11`.
|
A kmer fits in a single `u64`. Nucleotide 0 occupies bits 63–62, nucleotide i occupies bits 63−2i and 62−2i, and the low 64−2k bits are zero. Extraction of nucleotide i (0 ≤ i < k): `(kmer >> (62 - 2*i)) & 0b11`.
|
||||||
|
|
||||||
Reverse complement is computed via a **16-bit lookup table** (65 536 entries × 2 bytes = 128 KB, fits in L2 cache) storing the reverse-complement of every 8-base chunk.
|
Reverse complement is computed by **bit manipulation in four steps**, with no lookup table:
|
||||||
|
|
||||||
!!! abstract "Algorithm — Kmer reverse complement"
|
!!! abstract "Algorithm — Kmer reverse complement"
|
||||||
```text
|
```text
|
||||||
procedure KmerRevcomp(kmer, k):
|
procedure KmerRevcomp(kmer, k):
|
||||||
raw ← TABLE16[kmer & 0xFFFF] << 48
|
x ← ~kmer -- complement all bases
|
||||||
| TABLE16[(kmer >> 16) & 0xFFFF] << 32
|
x ← swap_bytes(x) -- reverse byte order
|
||||||
| TABLE16[(kmer >> 32) & 0xFFFF] << 16
|
x ← ((x >> 4) & 0x0F0F0F0F0F0F0F0F)
|
||||||
| TABLE16[(kmer >> 48) & 0xFFFF]
|
| ((x & 0x0F0F0F0F0F0F0F0F) << 4) -- swap nibbles within each byte
|
||||||
return raw << (64 - 2*k)
|
x ← ((x >> 2) & 0x3333333333333333)
|
||||||
|
| ((x & 0x3333333333333333) << 2) -- swap 2-bit pairs within each nibble
|
||||||
|
return x << (64 - 2*k) -- re-align to MSB
|
||||||
```
|
```
|
||||||
|
|
||||||
|
The three reorder passes together reverse the order of all 2-bit base codes across the 64-bit word. The bitwise NOT in the first step complements each base (A↔T, C↔G). The final left shift clears the low 64−2k padding bits.
|
||||||
|
|
||||||
The **canonical form** is the lexicographic minimum of the kmer and its reverse complement:
|
The **canonical form** is the lexicographic minimum of the kmer and its reverse complement:
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ The filter computes $\hat{H}(ws)$ for each word size ws from 1 to ws_max and ret
|
|||||||
|
|
||||||
$$\text{entropy}(kmer) = \min_{ws=1}^{ws_{\max}} \hat{H}(ws)$$
|
$$\text{entropy}(kmer) = \min_{ws=1}^{ws_{\max}} \hat{H}(ws)$$
|
||||||
|
|
||||||
A value near 0 indicates low complexity (e.g. AAAA…); near 1 indicates high complexity. A kmer is rejected if $\text{entropy}(kmer) \leq \theta$, where $\theta$ is a collection parameter. The minimum across word sizes ensures that any scale of repetition is detected independently: polyA is caught at ws=1, dinucleotide repeats at ws=2, etc.
|
A value near 0 indicates low complexity (e.g. AAAA…); near 1 indicates high complexity. A kmer is rejected if $\text{entropy}(kmer) < \theta$, where $\theta$ is a collection parameter (default 0.7). The minimum across word sizes ensures that any scale of repetition is detected independently: polyA is caught at ws=1, dinucleotide repeats at ws=2, etc.
|
||||||
|
|
||||||
## Interpretation as an effective number of classes
|
## Interpretation as an effective number of classes
|
||||||
|
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ impl KmerPartition {
|
|||||||
if !layer_dir.exists() { break; }
|
if !layer_dir.exists() { break; }
|
||||||
l += 1;
|
l += 1;
|
||||||
let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?;
|
let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?;
|
||||||
let reader = UnitigFileReader::open(&layer_dir.join("unitigs.bin"))?;
|
let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?;
|
||||||
|
|
||||||
let counts_dir = layer_dir.join("counts");
|
let counts_dir = layer_dir.join("counts");
|
||||||
let presence_dir = layer_dir.join("presence");
|
let presence_dir = layer_dir.join("presence");
|
||||||
@@ -97,7 +97,7 @@ impl KmerPartition {
|
|||||||
let layer_dir = index_dir.join(format!("layer_{layer}"));
|
let layer_dir = index_dir.join(format!("layer_{layer}"));
|
||||||
if !layer_dir.exists() { break; }
|
if !layer_dir.exists() { break; }
|
||||||
let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?;
|
let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?;
|
||||||
let reader = UnitigFileReader::open(&layer_dir.join("unitigs.bin"))?;
|
let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?;
|
||||||
|
|
||||||
let counts_dir = layer_dir.join("counts");
|
let counts_dir = layer_dir.join("counts");
|
||||||
let presence_dir = layer_dir.join("presence");
|
let presence_dir = layer_dir.join("presence");
|
||||||
|
|||||||
@@ -110,7 +110,7 @@ impl KmerPartition {
|
|||||||
uw.close()?;
|
uw.close()?;
|
||||||
|
|
||||||
if with_counts {
|
if with_counts {
|
||||||
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, block_bits, |kmer| {
|
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, block_bits, evidence, |kmer| {
|
||||||
match (&mphf1_opt, &counts1_opt) {
|
match (&mphf1_opt, &counts1_opt) {
|
||||||
(Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())),
|
(Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())),
|
||||||
_ => 1,
|
_ => 1,
|
||||||
@@ -118,13 +118,7 @@ impl KmerPartition {
|
|||||||
})
|
})
|
||||||
.map_err(olm_to_sk)?;
|
.map_err(olm_to_sk)?;
|
||||||
} else {
|
} else {
|
||||||
Layer::<()>::build(&layer_dir, block_bits).map_err(olm_to_sk)?;
|
Layer::<()>::build(&layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
|
||||||
}
|
|
||||||
|
|
||||||
// For approximate evidence: replace the exact evidence bundle with a
|
|
||||||
// fingerprint. For exact evidence, build() already wrote it.
|
|
||||||
if let EvidenceKind::Approx { b, z } = evidence {
|
|
||||||
Layer::<()>::build_approx_evidence(&layer_dir, *b, *z).map_err(olm_to_sk)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write meta.json in the index/ directory so LayeredMap::open works
|
// Write meta.json in the index/ directory so LayeredMap::open works
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder,
|
|||||||
PersistentCompactIntVecBuilder};
|
PersistentCompactIntVecBuilder};
|
||||||
use obikseq::CanonicalKmer;
|
use obikseq::CanonicalKmer;
|
||||||
use obiskio::{SKError, SKResult, UnitigFileReader};
|
use obiskio::{SKError, SKResult, UnitigFileReader};
|
||||||
use obilayeredmap::{Layer, LayeredMap, MphfLayer, OLMError};
|
use obilayeredmap::{EvidenceKind, Layer, LayeredMap, MphfLayer, OLMError};
|
||||||
use obilayeredmap::meta::PartitionMeta;
|
use obilayeredmap::meta::PartitionMeta;
|
||||||
|
|
||||||
use crate::partition::KmerPartition;
|
use crate::partition::KmerPartition;
|
||||||
@@ -195,7 +195,7 @@ impl KmerPartition {
|
|||||||
for l in 0..src_meta.n_layers {
|
for l in 0..src_meta.n_layers {
|
||||||
let unitigs_path = src_index_dir
|
let unitigs_path = src_index_dir
|
||||||
.join(format!("layer_{l}")).join("unitigs.bin");
|
.join(format!("layer_{l}")).join("unitigs.bin");
|
||||||
let reader = UnitigFileReader::open(&unitigs_path)?;
|
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
|
||||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||||
if dst_map.query(kmer).is_none() {
|
if dst_map.query(kmer).is_none() {
|
||||||
g.push(kmer);
|
g.push(kmer);
|
||||||
@@ -217,7 +217,7 @@ impl KmerPartition {
|
|||||||
uw.write(&unitig)?;
|
uw.write(&unitig)?;
|
||||||
}
|
}
|
||||||
uw.close()?;
|
uw.close()?;
|
||||||
Layer::<()>::build(&new_layer_dir, block_bits).map_err(olm_to_sk)?;
|
Layer::<()>::build(&new_layer_dir, block_bits, &EvidenceKind::Exact).map_err(olm_to_sk)?;
|
||||||
}
|
}
|
||||||
drop(g);
|
drop(g);
|
||||||
|
|
||||||
@@ -303,7 +303,7 @@ impl KmerPartition {
|
|||||||
|
|
||||||
for l in 0..src_meta.n_layers {
|
for l in 0..src_meta.n_layers {
|
||||||
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
|
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
|
||||||
let reader = UnitigFileReader::open(&src_layer_dir.join("unitigs.bin"))?;
|
let reader = UnitigFileReader::open_sequential(&src_layer_dir.join("unitigs.bin"))?;
|
||||||
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
||||||
|
|
||||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use obicompactvec::{PersistentBitMatrixBuilder,
|
|||||||
PersistentCompactIntVecBuilder};
|
PersistentCompactIntVecBuilder};
|
||||||
use obidebruinj::GraphDeBruijn;
|
use obidebruinj::GraphDeBruijn;
|
||||||
use obiskio::{SKError, SKResult, UnitigFileReader};
|
use obiskio::{SKError, SKResult, UnitigFileReader};
|
||||||
use obilayeredmap::{Layer, MphfLayer, OLMError};
|
use obilayeredmap::{EvidenceKind, Layer, MphfLayer, OLMError};
|
||||||
use obilayeredmap::meta::PartitionMeta;
|
use obilayeredmap::meta::PartitionMeta;
|
||||||
|
|
||||||
use crate::filter::KmerFilter;
|
use crate::filter::KmerFilter;
|
||||||
@@ -116,7 +116,7 @@ impl KmerPartition {
|
|||||||
let unitigs_path = src_layer_dir.join("unitigs.bin");
|
let unitigs_path = src_layer_dir.join("unitigs.bin");
|
||||||
if !unitigs_path.exists() { continue; }
|
if !unitigs_path.exists() { continue; }
|
||||||
|
|
||||||
let reader = UnitigFileReader::open(&unitigs_path)?;
|
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
|
||||||
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
||||||
|
|
||||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||||
@@ -146,7 +146,7 @@ impl KmerPartition {
|
|||||||
uw.close()?;
|
uw.close()?;
|
||||||
drop(g);
|
drop(g);
|
||||||
|
|
||||||
Layer::<()>::build(&dst_layer_dir, block_bits).map_err(olm_to_sk)?;
|
Layer::<()>::build(&dst_layer_dir, block_bits, &EvidenceKind::Exact).map_err(olm_to_sk)?;
|
||||||
let dst_mphf = MphfLayer::open(&dst_layer_dir).map_err(olm_to_sk)?;
|
let dst_mphf = MphfLayer::open(&dst_layer_dir).map_err(olm_to_sk)?;
|
||||||
|
|
||||||
// ── Prepare matrix builders (one column per genome) ───────────────────
|
// ── Prepare matrix builders (one column per genome) ───────────────────
|
||||||
@@ -181,7 +181,7 @@ impl KmerPartition {
|
|||||||
let unitigs_path = src_layer_dir.join("unitigs.bin");
|
let unitigs_path = src_layer_dir.join("unitigs.bin");
|
||||||
if !unitigs_path.exists() { continue; }
|
if !unitigs_path.exists() { continue; }
|
||||||
|
|
||||||
let reader = UnitigFileReader::open(&unitigs_path)?;
|
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
|
||||||
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
||||||
|
|
||||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||||
|
|||||||
@@ -102,8 +102,8 @@ impl<D: LayerData> Layer<D> {
|
|||||||
// ── Mode 1 — set membership ───────────────────────────────────────────────────
|
// ── Mode 1 — set membership ───────────────────────────────────────────────────
|
||||||
|
|
||||||
impl Layer<()> {
|
impl Layer<()> {
|
||||||
pub fn build(out_dir: &Path, block_bits: u8) -> OLMResult<usize> {
|
pub fn build(out_dir: &Path, block_bits: u8, evidence_kind: &EvidenceKind) -> OLMResult<usize> {
|
||||||
MphfLayer::build(out_dir, block_bits, &mut |_, _| Ok(()))
|
MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |_, _| Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a presence matrix for a set-membership layer (first merge).
|
/// Create a presence matrix for a set-membership layer (first merge).
|
||||||
@@ -126,6 +126,7 @@ impl Layer<PersistentCompactIntMatrix> {
|
|||||||
pub fn build(
|
pub fn build(
|
||||||
out_dir: &Path,
|
out_dir: &Path,
|
||||||
block_bits: u8,
|
block_bits: u8,
|
||||||
|
evidence_kind: &EvidenceKind,
|
||||||
count_of: impl Fn(CanonicalKmer) -> u32,
|
count_of: impl Fn(CanonicalKmer) -> u32,
|
||||||
) -> OLMResult<usize> {
|
) -> OLMResult<usize> {
|
||||||
let n = UnitigFileReader::open_sequential(&out_dir.join(UNITIGS_FILE))?.n_kmers();
|
let n = UnitigFileReader::open_sequential(&out_dir.join(UNITIGS_FILE))?.n_kmers();
|
||||||
@@ -133,7 +134,7 @@ impl Layer<PersistentCompactIntMatrix> {
|
|||||||
let mut mb = PersistentCompactIntMatrixBuilder::new(n, &counts_dir)
|
let mut mb = PersistentCompactIntMatrixBuilder::new(n, &counts_dir)
|
||||||
.map_err(OLMError::Io)?;
|
.map_err(OLMError::Io)?;
|
||||||
let mut col = mb.add_col().map_err(OLMError::Io)?;
|
let mut col = mb.add_col().map_err(OLMError::Io)?;
|
||||||
let n_built = MphfLayer::build(out_dir, block_bits, &mut |slot, kmer| {
|
let n_built = MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |slot, kmer| {
|
||||||
col.set(slot, count_of(kmer));
|
col.set(slot, count_of(kmer));
|
||||||
Ok(())
|
Ok(())
|
||||||
})?;
|
})?;
|
||||||
@@ -145,9 +146,10 @@ impl Layer<PersistentCompactIntMatrix> {
|
|||||||
pub fn build_from_map(
|
pub fn build_from_map(
|
||||||
out_dir: &Path,
|
out_dir: &Path,
|
||||||
block_bits: u8,
|
block_bits: u8,
|
||||||
|
evidence_kind: &EvidenceKind,
|
||||||
counts: &HashMap<CanonicalKmer, u32>,
|
counts: &HashMap<CanonicalKmer, u32>,
|
||||||
) -> OLMResult<usize> {
|
) -> OLMResult<usize> {
|
||||||
Self::build(out_dir, block_bits, |kmer| counts.get(&kmer).copied().unwrap_or(0))
|
Self::build(out_dir, block_bits, evidence_kind, |kmer| counts.get(&kmer).copied().unwrap_or(0))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -177,6 +179,7 @@ impl Layer<PersistentBitMatrix> {
|
|||||||
pub fn build_presence(
|
pub fn build_presence(
|
||||||
out_dir: &Path,
|
out_dir: &Path,
|
||||||
block_bits: u8,
|
block_bits: u8,
|
||||||
|
evidence_kind: &EvidenceKind,
|
||||||
n_genomes: usize,
|
n_genomes: usize,
|
||||||
present_in: impl Fn(CanonicalKmer, usize) -> bool,
|
present_in: impl Fn(CanonicalKmer, usize) -> bool,
|
||||||
) -> OLMResult<usize> {
|
) -> OLMResult<usize> {
|
||||||
@@ -186,7 +189,7 @@ impl Layer<PersistentBitMatrix> {
|
|||||||
let mut cols: Vec<_> = (0..n_genomes)
|
let mut cols: Vec<_> = (0..n_genomes)
|
||||||
.map(|_| mb.add_col().map_err(OLMError::Io))
|
.map(|_| mb.add_col().map_err(OLMError::Io))
|
||||||
.collect::<OLMResult<_>>()?;
|
.collect::<OLMResult<_>>()?;
|
||||||
let n_built = MphfLayer::build(out_dir, block_bits, &mut |slot, kmer| {
|
let n_built = MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |slot, kmer| {
|
||||||
for (g, col) in cols.iter_mut().enumerate() {
|
for (g, col) in cols.iter_mut().enumerate() {
|
||||||
col.set(slot, present_in(kmer, g));
|
col.set(slot, present_in(kmer, g));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
|
|||||||
use obicompactvec::PersistentCompactIntMatrix;
|
use obicompactvec::PersistentCompactIntMatrix;
|
||||||
use obikseq::CanonicalKmer;
|
use obikseq::CanonicalKmer;
|
||||||
use obiskio::{UnitigFileWriter, DEFAULT_BLOCK_BITS};
|
use obiskio::{UnitigFileWriter, DEFAULT_BLOCK_BITS};
|
||||||
|
use crate::meta::EvidenceKind;
|
||||||
|
|
||||||
use crate::error::OLMResult;
|
use crate::error::OLMResult;
|
||||||
use crate::layer::{Hit, Layer, LayerData};
|
use crate::layer::{Hit, Layer, LayerData};
|
||||||
@@ -90,7 +91,7 @@ impl LayeredMap<()> {
|
|||||||
pub fn push_layer(&mut self) -> OLMResult<usize> {
|
pub fn push_layer(&mut self) -> OLMResult<usize> {
|
||||||
let i = self.layers.len();
|
let i = self.layers.len();
|
||||||
let dir = layer_dir(&self.root, i);
|
let dir = layer_dir(&self.root, i);
|
||||||
Layer::<()>::build(&dir, DEFAULT_BLOCK_BITS)?;
|
Layer::<()>::build(&dir, DEFAULT_BLOCK_BITS, &EvidenceKind::Exact)?;
|
||||||
self.append_layer()?;
|
self.append_layer()?;
|
||||||
Ok(i)
|
Ok(i)
|
||||||
}
|
}
|
||||||
@@ -102,7 +103,7 @@ impl LayeredMap<PersistentCompactIntMatrix> {
|
|||||||
pub fn push_layer(&mut self, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
|
pub fn push_layer(&mut self, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
|
||||||
let i = self.layers.len();
|
let i = self.layers.len();
|
||||||
let dir = layer_dir(&self.root, i);
|
let dir = layer_dir(&self.root, i);
|
||||||
Layer::<PersistentCompactIntMatrix>::build(&dir, DEFAULT_BLOCK_BITS, count_of)?;
|
Layer::<PersistentCompactIntMatrix>::build(&dir, DEFAULT_BLOCK_BITS, &EvidenceKind::Exact, count_of)?;
|
||||||
self.append_layer()?;
|
self.append_layer()?;
|
||||||
Ok(i)
|
Ok(i)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use std::path::Path;
|
|||||||
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
||||||
use epserde::prelude::*;
|
use epserde::prelude::*;
|
||||||
use obikseq::CanonicalKmer;
|
use obikseq::CanonicalKmer;
|
||||||
use obiskio::{UnitigFileReader, UnitigFileWriter, build_unitig_idx};
|
use obiskio::{CanonicalKmerIter, UnitigFileReader, UnitigFileWriter, build_unitig_idx};
|
||||||
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
|
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
|
||||||
|
|
||||||
use crate::error::{OLMError, OLMResult};
|
use crate::error::{OLMError, OLMResult};
|
||||||
@@ -198,22 +198,35 @@ impl MphfLayer {
|
|||||||
Ok(n)
|
Ok(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build MPHF and exact evidence from the unitigs file already present in
|
/// Build MPHF then evidence from the unitigs file already present in `dir`.
|
||||||
/// `dir`. Calls `fill_slot(slot, kmer)` once per kmer for DataStore
|
///
|
||||||
/// population. Returns the number of kmers indexed.
|
/// - Exact: `.idx` is built for pass-1 parallel construction and kept for
|
||||||
|
/// query-time kmer verification. `evidence.bin` is written.
|
||||||
|
/// - Approx: pass-1 uses `open_sequential` + `par_bridge` — no `.idx` is
|
||||||
|
/// ever created. `fingerprint.bin` is written.
|
||||||
|
///
|
||||||
|
/// `fill_slot(slot, kmer)` is called once per kmer in both modes.
|
||||||
pub(crate) fn build(
|
pub(crate) fn build(
|
||||||
dir: &Path,
|
dir: &Path,
|
||||||
block_bits: u8,
|
block_bits: u8,
|
||||||
|
evidence_kind: &EvidenceKind,
|
||||||
fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>,
|
fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>,
|
||||||
) -> OLMResult<usize> {
|
) -> OLMResult<usize> {
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
|
|
||||||
let unitig_path = dir.join(UNITIGS_FILE);
|
let unitig_path = dir.join(UNITIGS_FILE);
|
||||||
|
|
||||||
build_unitig_idx(&unitig_path, block_bits)?;
|
match evidence_kind {
|
||||||
|
// ── Exact path ────────────────────────────────────────────────────
|
||||||
let unitigs = UnitigFileReader::open(&unitig_path)?;
|
// .idx is built LAST, once evidence.bin is written, so it is never
|
||||||
let n = unitigs.n_kmers();
|
// present during construction — only at query time.
|
||||||
|
EvidenceKind::Exact => {
|
||||||
|
let n = UnitigFileReader::open_sequential(&unitig_path)?.n_kmers();
|
||||||
|
let keys = CanonicalKmerIter::new(&unitig_path)
|
||||||
|
.map_err(|e| match e {
|
||||||
|
obiskio::SKError::Io(io) => OLMError::Io(io),
|
||||||
|
e => OLMError::InvalidLayer(e.to_string()),
|
||||||
|
})?;
|
||||||
|
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
fs::File::create(dir.join(EVIDENCE_FILE))?;
|
fs::File::create(dir.join(EVIDENCE_FILE))?;
|
||||||
@@ -223,20 +236,18 @@ impl MphfLayer {
|
|||||||
mphf.store(&dir.join(MPHF_FILE))
|
mphf.store(&dir.join(MPHF_FILE))
|
||||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||||
LayerMeta::exact().save(dir)?;
|
LayerMeta::exact().save(dir)?;
|
||||||
|
build_unitig_idx(&unitig_path, block_bits)?;
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pass 1 — build MPHF (parallel, random access via .idx)
|
// Pass 1 — MPHF construction via clonable mmap iterator
|
||||||
let keys = (0..unitigs.len())
|
|
||||||
.into_par_iter()
|
|
||||||
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
|
|
||||||
let mphf: Mphf =
|
let mphf: Mphf =
|
||||||
Mphf::new_from_par_iter(n, keys, PtrHashParams::<CubicEps>::default());
|
Mphf::new_from_par_iter(n, keys.map(|k| k.raw()).par_bridge(), PtrHashParams::<CubicEps>::default());
|
||||||
mphf.store(&dir.join(MPHF_FILE))
|
mphf.store(&dir.join(MPHF_FILE))
|
||||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||||
|
|
||||||
// Pass 2 — fill evidence + mode-specific data via callback
|
// Pass 2 — sequential: fill evidence.bin + callback
|
||||||
let unitigs2 = UnitigFileReader::open(&unitig_path)?;
|
let unitigs2 = UnitigFileReader::open_sequential(&unitig_path)?;
|
||||||
let mut ev = EvidenceWriter::new(n);
|
let mut ev = EvidenceWriter::new(n);
|
||||||
let mut seen = vec![0u8; (n + 7) / 8];
|
let mut seen = vec![0u8; (n + 7) / 8];
|
||||||
|
|
||||||
@@ -257,6 +268,65 @@ impl MphfLayer {
|
|||||||
|
|
||||||
ev.write(&dir.join(EVIDENCE_FILE))?;
|
ev.write(&dir.join(EVIDENCE_FILE))?;
|
||||||
LayerMeta::exact().save(dir)?;
|
LayerMeta::exact().save(dir)?;
|
||||||
|
// .idx built last: strictly for query-time kmer verification
|
||||||
|
build_unitig_idx(&unitig_path, block_bits)?;
|
||||||
|
Ok(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Approx path ───────────────────────────────────────────────────
|
||||||
|
// No .idx is created at any point.
|
||||||
|
EvidenceKind::Approx { b, z } => {
|
||||||
|
let unitigs = UnitigFileReader::open_sequential(&unitig_path)?;
|
||||||
|
let n = unitigs.n_kmers();
|
||||||
|
|
||||||
|
if n == 0 {
|
||||||
|
FingerprintVecWriter::new(0, *b).write(&dir.join(FINGERPRINT_FILE))?;
|
||||||
|
let mphf: Mphf =
|
||||||
|
Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
|
||||||
|
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
|
||||||
|
mphf.store(&dir.join(MPHF_FILE))
|
||||||
|
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||||
|
LayerMeta::approx(*b, *z).save(dir)?;
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pass 1 — MPHF construction via mmap-backed clonable iterator.
|
||||||
|
// No .idx is created. par_bridge() parallelises the sequential scan;
|
||||||
|
// Clone on CanonicalKmerRawIter shares the Arc<Mmap> and resets to pos 0.
|
||||||
|
let keys = CanonicalKmerIter::new(&unitig_path)
|
||||||
|
.map_err(|e| match e {
|
||||||
|
obiskio::SKError::Io(io) => OLMError::Io(io),
|
||||||
|
e => OLMError::InvalidLayer(e.to_string()),
|
||||||
|
})?;
|
||||||
|
let mphf: Mphf =
|
||||||
|
Mphf::new_from_par_iter(n, keys.map(|k| k.raw()).par_bridge(), PtrHashParams::<CubicEps>::default());
|
||||||
|
mphf.store(&dir.join(MPHF_FILE))
|
||||||
|
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||||
|
|
||||||
|
// Pass 2 — sequential: fill fingerprint.bin + callback
|
||||||
|
let unitigs2 = UnitigFileReader::open_sequential(&unitig_path)?;
|
||||||
|
let mut fw = FingerprintVecWriter::new(n, *b);
|
||||||
|
let mut seen = vec![0u8; (n + 7) / 8];
|
||||||
|
|
||||||
|
for kmer in unitigs2.iter_canonical_kmers() {
|
||||||
|
let slot = mphf.index(&kmer.raw());
|
||||||
|
if slot >= n {
|
||||||
|
return Err(OLMError::Mphf("slot out of bounds".into()));
|
||||||
|
}
|
||||||
|
let byte = slot / 8;
|
||||||
|
let bit = 1u8 << (slot % 8);
|
||||||
|
if seen[byte] & bit != 0 {
|
||||||
|
return Err(OLMError::Mphf("duplicate slot".into()));
|
||||||
|
}
|
||||||
|
seen[byte] |= bit;
|
||||||
|
fw.set(slot, kmer.seq_hash());
|
||||||
|
fill_slot(slot, kmer)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
fw.write(&dir.join(FINGERPRINT_FILE))?;
|
||||||
|
LayerMeta::approx(*b, *z).save(dir)?;
|
||||||
Ok(n)
|
Ok(n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use super::*;
|
|||||||
use obicompactvec::PersistentCompactIntMatrix;
|
use obicompactvec::PersistentCompactIntMatrix;
|
||||||
use obikseq::{set_k, Kmer, Sequence as _, Unitig};
|
use obikseq::{set_k, Kmer, Sequence as _, Unitig};
|
||||||
use obiskio::DEFAULT_BLOCK_BITS;
|
use obiskio::DEFAULT_BLOCK_BITS;
|
||||||
|
use crate::meta::EvidenceKind;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
fn write_unitigs(dir: &Path, seqs: &[&[u8]]) {
|
fn write_unitigs(dir: &Path, seqs: &[&[u8]]) {
|
||||||
@@ -24,7 +25,7 @@ fn build_and_query_all_kmers_found() {
|
|||||||
let dir = tempdir().unwrap();
|
let dir = tempdir().unwrap();
|
||||||
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
||||||
let kmers = all_canonical_kmers(dir.path());
|
let kmers = all_canonical_kmers(dir.path());
|
||||||
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS).unwrap();
|
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact).unwrap();
|
||||||
let layer = Layer::<()>::open(dir.path()).unwrap();
|
let layer = Layer::<()>::open(dir.path()).unwrap();
|
||||||
for kmer in kmers {
|
for kmer in kmers {
|
||||||
assert!(layer.query(kmer).is_some(), "kmer should be present");
|
assert!(layer.query(kmer).is_some(), "kmer should be present");
|
||||||
@@ -42,6 +43,7 @@ fn counts_are_stored_and_retrieved() {
|
|||||||
Layer::<PersistentCompactIntMatrix>::build(
|
Layer::<PersistentCompactIntMatrix>::build(
|
||||||
dir.path(),
|
dir.path(),
|
||||||
DEFAULT_BLOCK_BITS,
|
DEFAULT_BLOCK_BITS,
|
||||||
|
&EvidenceKind::Exact,
|
||||||
|kmer| count_map.get(&kmer).copied().unwrap_or(0),
|
|kmer| count_map.get(&kmer).copied().unwrap_or(0),
|
||||||
).unwrap();
|
).unwrap();
|
||||||
let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap();
|
let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap();
|
||||||
@@ -56,7 +58,7 @@ fn query_absent_returns_none() {
|
|||||||
set_k(4);
|
set_k(4);
|
||||||
let dir = tempdir().unwrap();
|
let dir = tempdir().unwrap();
|
||||||
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
||||||
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS).unwrap();
|
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact).unwrap();
|
||||||
let layer = Layer::<()>::open(dir.path()).unwrap();
|
let layer = Layer::<()>::open(dir.path()).unwrap();
|
||||||
let absent = Kmer::from_ascii(b"CCCC").unwrap().canonical();
|
let absent = Kmer::from_ascii(b"CCCC").unwrap().canonical();
|
||||||
assert!(layer.query(absent).is_none());
|
assert!(layer.query(absent).is_none());
|
||||||
@@ -67,7 +69,7 @@ fn open_after_build_is_consistent() {
|
|||||||
set_k(4);
|
set_k(4);
|
||||||
let dir = tempdir().unwrap();
|
let dir = tempdir().unwrap();
|
||||||
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
||||||
let n = Layer::<PersistentCompactIntMatrix>::build(dir.path(), DEFAULT_BLOCK_BITS, |_| 7).unwrap();
|
let n = Layer::<PersistentCompactIntMatrix>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact, |_| 7).unwrap();
|
||||||
assert_eq!(n, 4);
|
assert_eq!(n, 4);
|
||||||
let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap();
|
let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap();
|
||||||
let kmer = Kmer::from_ascii(b"AAAA").unwrap().canonical();
|
let kmer = Kmer::from_ascii(b"AAAA").unwrap().canonical();
|
||||||
|
|||||||
@@ -8,7 +8,8 @@ pub use error::{SKError, SKResult};
|
|||||||
pub use meta::SKFileMeta;
|
pub use meta::SKFileMeta;
|
||||||
pub use pool::{SKFilePool, SKFileWriter, SharedPool, create_token, create_token_with};
|
pub use pool::{SKFilePool, SKFileWriter, SharedPool, create_token, create_token_with};
|
||||||
pub use reader::{SKFileIter, SKFileReader};
|
pub use reader::{SKFileIter, SKFileReader};
|
||||||
pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS};
|
pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS,
|
||||||
|
CanonicalKmerIter};
|
||||||
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{BufWriter, Write as _};
|
use std::io::{BufWriter, Write as _};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use memmap2::Mmap;
|
use memmap2::Mmap;
|
||||||
use obikseq::{CanonicalKmer, Kmer, Unitig};
|
use obikseq::{CanonicalKmer, Kmer, Unitig};
|
||||||
@@ -439,6 +440,85 @@ fn extract_kmer_raw(bytes: &[u8], j: usize, k: usize) -> u64 {
|
|||||||
raw << (64 - 2 * k)
|
raw << (64 - 2 * k)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── CanonicalKmerRawIter ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// ── CanonicalKmerIter ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Sequential iterator over [`CanonicalKmer`] from a `unitigs.bin` file.
|
||||||
|
///
|
||||||
|
/// Holds an `Arc<Mmap>` so that `Clone` is O(1): both copies share the same
|
||||||
|
/// memory-mapped pages. Cloning resets the cursor to position 0 — this lets
|
||||||
|
/// ptr_hash's `new_from_par_iter` (which requires a `Clone`-able parallel
|
||||||
|
/// iterator via `par_bridge()`) make multiple passes without ever creating
|
||||||
|
/// a `.idx` file.
|
||||||
|
pub struct CanonicalKmerIter {
|
||||||
|
mmap: Arc<Mmap>,
|
||||||
|
k: usize,
|
||||||
|
chunk_pos: usize, // byte offset of the current chunk header
|
||||||
|
data_pos: usize, // byte offset of the current chunk's sequence bytes
|
||||||
|
n_kmers: usize, // kmers in current chunk
|
||||||
|
kmer_idx: usize, // next kmer index to yield within the current chunk
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CanonicalKmerIter {
|
||||||
|
pub fn new(path: &Path) -> SKResult<Self> {
|
||||||
|
let file = File::open(path).map_err(SKError::Io)?;
|
||||||
|
let mmap = Arc::new(unsafe { Mmap::map(&file).map_err(SKError::Io)? });
|
||||||
|
let k = obikseq::params::k();
|
||||||
|
let mut s = Self { mmap, k, chunk_pos: 0, data_pos: 0, n_kmers: 0, kmer_idx: 0 };
|
||||||
|
s.load_chunk();
|
||||||
|
Ok(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn load_chunk(&mut self) {
|
||||||
|
if self.chunk_pos < self.mmap.len() {
|
||||||
|
let seql_minus_k = self.mmap[self.chunk_pos] as usize;
|
||||||
|
self.n_kmers = seql_minus_k + 1;
|
||||||
|
self.data_pos = self.chunk_pos + 1;
|
||||||
|
self.kmer_idx = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for CanonicalKmerIter {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
let mut c = Self {
|
||||||
|
mmap: Arc::clone(&self.mmap),
|
||||||
|
k: self.k,
|
||||||
|
chunk_pos: 0,
|
||||||
|
data_pos: 0,
|
||||||
|
n_kmers: 0,
|
||||||
|
kmer_idx: 0,
|
||||||
|
};
|
||||||
|
c.load_chunk();
|
||||||
|
c
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iterator for CanonicalKmerIter {
|
||||||
|
type Item = CanonicalKmer;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn next(&mut self) -> Option<CanonicalKmer> {
|
||||||
|
loop {
|
||||||
|
if self.chunk_pos >= self.mmap.len() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
if self.kmer_idx < self.n_kmers {
|
||||||
|
let raw = extract_kmer_raw(&self.mmap[self.data_pos..], self.kmer_idx, self.k);
|
||||||
|
let canon = canonical_raw(raw, self.k);
|
||||||
|
self.kmer_idx += 1;
|
||||||
|
return Some(CanonicalKmer::from_raw_unchecked(canon));
|
||||||
|
}
|
||||||
|
let seql_minus_k = self.mmap[self.chunk_pos] as usize;
|
||||||
|
let byte_len = (seql_minus_k + self.k + 3) / 4;
|
||||||
|
self.chunk_pos += 1 + byte_len;
|
||||||
|
self.load_chunk();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
#[path = "tests/unitig_index.rs"]
|
#[path = "tests/unitig_index.rs"]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|||||||
Reference in New Issue
Block a user