Push vqmnuyrkpxot #9
@@ -8,6 +8,9 @@ Ne modifier aucun fichier à moins d'une demande explicite de modification. En p
|
||||
**Règle absolue : ne jamais substituer une dépendance ou une bibliothèque sans validation explicite.**
|
||||
Si une dépendance demandée pose problème (erreur de compilation, bug, API manquante), exposer le problème et proposer des alternatives — ne jamais switcher silencieusement vers une autre bibliothèque. Le choix des dépendances est une décision d'architecture qui appartient au développeur.
|
||||
|
||||
**Règle absolue : le code existant est une hypothèse, pas une vérité.**
|
||||
Quand une nouvelle construction (type, itérateur, abstraction) rend du code historique injustifié, le signaler immédiatement et proposer de le supprimer — ne pas conserver les deux en parallèle par inertie. Le développeur demande explicitement de remettre en cause le code base : ne pas attendre qu'il insiste.
|
||||
|
||||
Tu maintiens en **anglais**, dense et sans remplissage, les documents suivants :
|
||||
- `docmd/index.md` — document de discussion de base, enrichi progressivement au fil de nos échanges ; il reflète l'état courant de la réflexion sur le projet
|
||||
- les autres fichiers Markdown dans `docmd/` selon leur thème respectif
|
||||
|
||||
+81
-48
@@ -19,34 +19,80 @@ Given a set of query sequences, determine for each sequence how many of its k-me
|
||||
The query follows the same superkmer-based partitioning strategy used at indexing time.
|
||||
|
||||
```
|
||||
for each query sequence:
|
||||
decompose into superkmers (non-ACGT breaks, same minimiser scheme as indexing)
|
||||
for each superkmer:
|
||||
route to partition p via minimiser hash
|
||||
for each kmer in the superkmer:
|
||||
lookup kmer in partition p (MPHF → evidence check → matrix row)
|
||||
accumulate result into per-sequence accumulators
|
||||
emit annotated sequence
|
||||
for each batch of sequences:
|
||||
build QueryBatch: decompose all sequences into superkmers, deduplicate
|
||||
split superkmers by partition via minimiser hash
|
||||
for each partition p:
|
||||
query_partition(p, superkmers_routed_to_p)
|
||||
→ load QueryLayer(s) for p
|
||||
→ for each kmer in each superkmer: MphfLayer::find(kmer)
|
||||
broadcast results back to each (seq_idx, kmer_offset) that referenced the superkmer
|
||||
emit annotated sequences
|
||||
```
|
||||
|
||||
Parallelism is **per sequence**: each worker thread handles all partitions of one sequence independently. This avoids cross-thread coordination when merging partial results and keeps memory usage proportional to the number of concurrent sequences rather than to the number of partitions.
|
||||
Superkmers that appear more than once in the batch (same sequence or across sequences) are deduplicated: each unique `RoutableSuperKmer` is queried once per partition, and the result is broadcast to every `SKDesc` entry that references it.
|
||||
|
||||
Parallelism is **not yet active** in the current implementation: batches are processed sequentially on a single thread despite the `--threads` flag being parsed. The `QueryBatch` / `split_by_partition` design is structured to support per-partition parallelism in a future iteration.
|
||||
|
||||
---
|
||||
|
||||
## Exact vs. approximate matching
|
||||
## Layer lookup: `MphfLayer::find`
|
||||
|
||||
### Exact (default)
|
||||
`MphfLayer::open` reads `layer_meta.json` and loads either exact or approximate evidence. The caller (`QueryLayer::find`) never chooses the dispatch path — it is fixed at open time by `LayerEvidence`:
|
||||
|
||||
Standard MPHF lookup followed by evidence check. O(1) per k-mer.
|
||||
```rust
|
||||
pub fn find(&self, kmer: CanonicalKmer) -> Option<usize> {
|
||||
match &self.ev {
|
||||
LayerEvidence::Exact { .. } => self.find_exact(kmer),
|
||||
LayerEvidence::Approx { .. } => self.find_approx(kmer),
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 1-mismatch (`--mismatch` flag)
|
||||
### Exact layers
|
||||
|
||||
For each k-mer of the query, generate all `3·k` single-substitution variants. Each variant is canonicalised and looked up independently in the index. If one or more variants are found, their per-genome rows are **summed** into the result for that k-mer position.
|
||||
`find_exact` maps the k-mer through the MPHF to a slot, then calls `UnitigFileReader::verify_canonical_kmer(chunk_id, rank, kmer)` to confirm the stored k-mer matches. Zero false positives. Requires `UnitigFileReader::open()` (random-access via `.idx`); `open_sequential()` cannot serve random-access verification.
|
||||
|
||||
- If a k-mer matches exactly AND one of its variants also matches (distinct k-mers in the index), both contributions are accumulated.
|
||||
- Exact and approximate matches are tracked separately in the output (see annotation schema below).
|
||||
- The superkmer routing optimisation is **not** used in 1-mismatch mode: each variant is looked up directly via its own minimiser.
|
||||
- Cost: up to `3·k` MPHF probes per k-mer position vs. 1 in exact mode.
|
||||
### Approximate layers
|
||||
|
||||
`find_approx` maps the k-mer through the MPHF, then checks a stored `b`-bit fingerprint of the canonical hash. False-positive rate: **1/2^b per k-mer query**. No `.idx` file is needed; the layer carries only `fingerprint.bin`.
|
||||
|
||||
For a query window of z consecutive k-mers (Findere scheme), the false-positive rate per window is **1/2^(b·z)**. The `z` parameter is recorded in `layer_meta.json` at build time but is not enforced during querying — the caller is responsible for interpreting window-level results accordingly.
|
||||
|
||||
### `QueryLayer` variant selection
|
||||
|
||||
`QueryLayer::open` in `query_layer.rs` selects the data matrix to pair with `MphfLayer`:
|
||||
|
||||
| Condition | Variant | Data returned per k-mer |
|
||||
|---|---|---|
|
||||
| `with_counts=true` and `counts/` exists | `Count` | raw count per genome |
|
||||
| `presence/` exists | `Presence` | 0/1 per genome (bit matrix) |
|
||||
| only `counts/` exists | `Count` | counts used as-is |
|
||||
| neither exists | `SetOnly` | 1 for every genome |
|
||||
|
||||
---
|
||||
|
||||
## `open()` vs `open_sequential()`
|
||||
|
||||
`UnitigFileReader::open()` loads the `.idx` block-offset table, enabling random access to individual unitig chunks. It is required whenever `verify_canonical_kmer` is called (exact layers at query time).
|
||||
|
||||
`UnitigFileReader::open_sequential()` skips the `.idx` and supports only forward iteration. It is sufficient for:
|
||||
- build passes that scan all unitigs sequentially (`build_exact_evidence`, `build_approx_evidence`);
|
||||
- the `unitig` subcommand, which iterates and prints unitigs without random access.
|
||||
|
||||
`KmerIndex::open()` (called by `query::run`) triggers `MphfLayer::open` for each layer, which calls `UnitigFileReader::open()` for exact layers. Approximate layers do not open a unitig reader at all.
|
||||
|
||||
---
|
||||
|
||||
## Presence / count mode at query time
|
||||
|
||||
The `--force-presence` flag and `--presence-threshold` control how per-genome values are accumulated, independently of what the index stores:
|
||||
|
||||
```
|
||||
genome_totals[g] += if presence { u32::from(v >= threshold) } else { v }
|
||||
```
|
||||
|
||||
`presence` is true when `--force-presence` is set or when the index has no counts (`!with_counts`). The default `presence_threshold` is 1, so any nonzero count counts as a match.
|
||||
|
||||
---
|
||||
|
||||
@@ -55,57 +101,44 @@ For each k-mer of the query, generate all `3·k` single-substitution variants. E
|
||||
Output sequences are written in **OBITools4 format**: the original sequence with a JSON annotation map in the title line.
|
||||
|
||||
```
|
||||
>read_id {"kmer_total":150,"kmer_found":59,...}
|
||||
>read_id {"kmer_count":59,"kmer_strict_matches":{"genome_a":42,"genome_b":7,...}}
|
||||
ATCGATCG...
|
||||
```
|
||||
|
||||
Genome order in all list-valued annotations follows the genome order recorded in `index.meta`.
|
||||
Genome keys in `kmer_strict_matches` are genome labels from `index.meta`. Key order follows iteration order of `meta.genomes`.
|
||||
|
||||
---
|
||||
|
||||
## Annotation schema
|
||||
|
||||
### Summary mode (default)
|
||||
## Annotation schema (current implementation)
|
||||
|
||||
| Key | Type | Condition | Semantics |
|
||||
|---|---|---|---|
|
||||
| `kmer_total` | int | always | total k-mers in the (masked) sequence |
|
||||
| `kmer_found` | int | always | k-mers with at least one match (exact or approx) |
|
||||
| `kmer_missing` | int | `--count-missing` | k-mers absent from the index |
|
||||
| `kmer_match` | list[int] | always | per-genome matched k-mer count (exact + approx) |
|
||||
| `kmer_match_exact` | list[int] | `--mismatch` | per-genome exact match count |
|
||||
| `kmer_match_approx` | list[int] | `--mismatch` | per-genome approx match count |
|
||||
| `count_match` | list[int] | count index | per-genome sum of index counts for matched k-mers |
|
||||
| `kmer_count` | int | always | k-mers with at least one match |
|
||||
| `kmer_missing` | int | `--count-missing` | k-mers absent from every layer |
|
||||
| `kmer_strict_matches` | object | always | per-genome accumulated value (label → count or 0/1) |
|
||||
|
||||
`kmer_match[i]` is the number of k-mer positions in the query that contribute at least one match to genome i. In 1-mismatch mode, a single k-mer position can contribute to multiple genomes if several of its variants are present in the index.
|
||||
`kmer_count` counts matched k-mer positions (incremented once per `Some(row)` hit regardless of how many genomes are covered). `kmer_missing` counts `None` hits.
|
||||
|
||||
`count_match[i]` sums raw index counts across all matched k-mer positions for genome i. Only meaningful for count indexes.
|
||||
|
||||
### Detail mode (`--detail`)
|
||||
|
||||
All summary keys, plus per-position coverage vectors — one list per genome, length `len(sequence) − k + 1`:
|
||||
|
||||
| Key | Type | Condition | Semantics |
|
||||
|---|---|---|---|
|
||||
| `cov_<i>` | list[int] | `--detail` | coverage at each k-mer position for genome i; raw count (count index) or 0/1 (presence index); 0 if absent |
|
||||
| `cov_exact_<i>` | list[int] | `--detail` + `--mismatch` | exact-match contribution per position |
|
||||
| `cov_approx_<i>` | list[int] | `--detail` + `--mismatch` | approx-match contribution per position |
|
||||
|
||||
Genome indices in key names are 0-based integers matching the `index.meta` genome order. Genome labels are not used as key names to avoid issues with special characters in long or complex genome identifiers.
|
||||
**Note on doc/impl divergence:** the doc previously used keys `kmer_total`, `kmer_found`, and `kmer_match` (list). The implementation uses `kmer_count` (int, matched only) and `kmer_strict_matches` (object keyed by genome label). `--mismatch` and `--detail` are parsed but not yet implemented and emit a warning.
|
||||
|
||||
---
|
||||
|
||||
## CLI
|
||||
|
||||
```
|
||||
obikmer query -i <index> [--summary | --detail] [--mismatch] [--count-missing] <query.fa>
|
||||
obikmer query -i <index> [--detail] [--mismatch] [--count-missing]
|
||||
[--force-presence] [--presence-threshold <n>]
|
||||
[-T <threads>] <query.fa> [<query2.fa> ...]
|
||||
```
|
||||
|
||||
`--summary` is the default; `--detail` implies `--summary` (all summary keys are always present).
|
||||
`--mismatch` and `--detail` are accepted but currently ignored with a warning on stderr.
|
||||
|
||||
---
|
||||
|
||||
## Future work
|
||||
|
||||
- **Read classification** (`--classify`): assign each read to the genome with the highest `kmer_match` score; emit as a single annotation key.
|
||||
- **Whitelist / blacklist filtering**: accept or reject sequences based on whether their k-mer match score for a designated set of genomes exceeds a threshold.
|
||||
- **`--mismatch`**: 1-mismatch approximate matching — generate `3·k` single-substitution variants per k-mer, look each up independently.
|
||||
- **`--detail`**: per-position coverage vectors (`cov_<i>`) per genome.
|
||||
- **Read classification** (`--classify`): assign each read to the genome with the highest match score.
|
||||
- **Parallelism**: activate per-partition or per-sequence worker threads using the already-parsed `--threads` value.
|
||||
- **Whitelist / blacklist filtering**: threshold-based accept/reject on per-genome match scores.
|
||||
|
||||
@@ -2,19 +2,11 @@
|
||||
|
||||
The `obiread` crate provides a streaming iterator that reads FASTA or FASTQ files in fixed-size blocks and yields self-contained chunks, each ending on a complete sequence record boundary. Chunks are consumed in parallel by downstream workers.
|
||||
|
||||
## Output type: rope
|
||||
## Output type: Rope
|
||||
|
||||
Each chunk is a `Vec<Bytes>` — a **rope**: a list of reference-counted byte slices that are not necessarily contiguous in memory. The consumer iterates over the slices in order.
|
||||
Each chunk is a `Rope` — a segmented byte sequence: a `Vec` of blocks, where each block is a `Vec<Cell<u8>>`. The consumer iterates over the blocks via a forward or backward cursor.
|
||||
|
||||
Using `bytes::Bytes` means the split at the record boundary is O(1): `Bytes::split_to(n)` adjusts a reference counter, not memory. No `memcpy` in the common case.
|
||||
|
||||
## Allocation policy
|
||||
|
||||
| Case | Cost |
|
||||
|------|------|
|
||||
| Boundary found in the current block (common) | zero extra allocation — `split_to` only |
|
||||
| Boundary straddles multiple blocks (sequence > block size, rare) | one allocation to pack the rope into a flat buffer |
|
||||
| EOF flush | zero extra allocation |
|
||||
`Rope::split_off(pos)` splits at an absolute byte offset in O(log n) (binary search over block-start index). If `pos` falls inside a block, that block is split in two via `Vec::split_off` — no `memcpy` in the common case.
|
||||
|
||||
## SeqChunkIter
|
||||
|
||||
@@ -22,7 +14,7 @@ Using `bytes::Bytes` means the split at the record boundary is O(1): `Bytes::spl
|
||||
pub struct SeqChunkIter<R: Read> { /* private */ }
|
||||
|
||||
impl<R: Read> Iterator for SeqChunkIter<R> {
|
||||
type Item = io::Result<Vec<Bytes>>;
|
||||
type Item = io::Result<Rope>;
|
||||
}
|
||||
|
||||
pub fn fasta_chunks<R: Read>(source: R) -> SeqChunkIter<R>
|
||||
@@ -32,22 +24,21 @@ pub fn fastq_chunks<R: Read>(source: R) -> SeqChunkIter<R>
|
||||
`next()` loop:
|
||||
|
||||
```text
|
||||
1. read one block of block_size bytes → push onto rope
|
||||
2. probe check: if the boundary marker ("\n>" or "\n@") is absent from the
|
||||
last block, skip the splitter (avoids a full backward scan for nothing)
|
||||
3. call splitter on last block
|
||||
if found at offset n:
|
||||
remainder = last_block.split_to(n) ← O(1), zero copy
|
||||
return std::mem::take(&mut self.rope) ← the chunk
|
||||
4. if rope.len() > 1 (multi-block accumulation):
|
||||
pack rope into one flat buffer ← one alloc
|
||||
retry splitter on flat buffer
|
||||
5. if EOF: flush remaining rope as final chunk
|
||||
1. read one block of block_size bytes → push onto Rope
|
||||
2. call splitter(rope) → Option<abs_offset>
|
||||
if Some(pos):
|
||||
tail = rope.split_off(pos) ← O(log n), may split one block
|
||||
chunk = mem::replace(&mut rope, tail)
|
||||
return Some(Ok(chunk))
|
||||
3. if EOF and rope non-empty: return Some(Ok(rope)) as final chunk
|
||||
4. if EOF and rope empty: return None
|
||||
```
|
||||
|
||||
The `Splitter` function signature is `fn(&Rope) -> Option<usize>`. It returns the absolute byte offset of the start of the last complete record, or `None` if no boundary was found in the accumulated rope (need more data).
|
||||
|
||||
## Boundary detection — FASTA
|
||||
|
||||
Backward scan with a 2-state machine. Searches for `>` immediately preceded by `\n` or `\r`:
|
||||
Backward scan with a 2-state machine. Searches (right to left) for `>` followed by `\n` or `\r` (i.e., a `>` that is preceded by a newline in forward order):
|
||||
|
||||
```mermaid
|
||||
stateDiagram-v2
|
||||
@@ -58,13 +49,13 @@ stateDiagram-v2
|
||||
FoundGt --> [*] : '\\n' / '\\r' ✓
|
||||
```
|
||||
|
||||
Returns the byte offset of the `>` that starts the last complete record.
|
||||
Returns the byte offset of the `>` that starts the last complete record. Returns `None` if only one `>` is found (cannot confirm there is a prior complete record).
|
||||
|
||||
## Boundary detection — FASTQ
|
||||
|
||||
FASTQ records have a rigid 4-line structure (`@header`, sequence, `+`, quality). The `@` character (ASCII 64, Phred score 31) can appear legitimately in quality lines, making any forward heuristic unreliable. The backward scanner verifies the full structural context before accepting a candidate `@`.
|
||||
|
||||
7-state machine (port of Go's `EndOfLastFastqEntry`), scanning from **right to left**. Each time a `+` is found, its position is saved as `restart`; any state mismatch resets the scan to that position.
|
||||
7-state machine (states 0–6), scanning from **right to left**. Each time a `+` is found, its position is saved as `restart`; any state mismatch resets the scan to that position.
|
||||
|
||||
```mermaid
|
||||
stateDiagram-v2
|
||||
|
||||
@@ -1,38 +1,57 @@
|
||||
# Kmer — implementation
|
||||
|
||||
## Memory layout
|
||||
## Types and layout
|
||||
|
||||
`Kmer` is a `#[repr(transparent)]` newtype over `u64`:
|
||||
`KmerOf<L>` is a `#[repr(transparent)]` newtype over `u64` parameterized by a `KmerLength` marker:
|
||||
|
||||
```rust
|
||||
#[repr(transparent)]
|
||||
pub struct Kmer(u64);
|
||||
pub struct KmerOf<L: KmerLength>(u64, PhantomData<L>);
|
||||
```
|
||||
|
||||
Nucleotides are packed 2 bits each, **left-aligned**, MSB-first. Nucleotide 0 occupies bits 63–62; nucleotide i occupies bits 63−2i and 62−2i. The low 64−2k bits are always zero. k is **not stored** — it is a parameter of every operation that needs it, and will be owned by the future collection-level indexer.
|
||||
Three marker types implement `KmerLength`:
|
||||
|
||||
| Marker | `len()` source | Used for |
|
||||
|--------|---------------|---------|
|
||||
| `KLen` | `params::k()` | k-mers |
|
||||
| `MLen` | `params::m()` | minimizers |
|
||||
| `ConstLen<N>` | const generic `N` | tests |
|
||||
|
||||
Public aliases:
|
||||
|
||||
```rust
|
||||
pub type Kmer = KmerOf<KLen>; // k-mer, global k
|
||||
pub type Minimizer = CanonicalKmerOf<MLen>; // canonical m-mer, global m
|
||||
```
|
||||
|
||||
Nucleotides are packed 2 bits each, **left-aligned**, MSB-first. Nucleotide 0 occupies bits 63–62; nucleotide i occupies bits 63−2i and 62−2i. The low 64−2·len bits are always zero. The length is **not stored** — every operation reads it from `L::len()`.
|
||||
|
||||
| 63–62 | 61–60 | … | 63−2(k−1)−1 to 63−2(k−1) | 63−2k down to 0 |
|
||||
|-------|-------|---|--------------------------|-----------------|
|
||||
| nt 0 | nt 1 | … | nt k−1 | zero padding |
|
||||
|
||||
## Global parameters
|
||||
|
||||
`params::set_k(k)` / `params::k()` and `params::set_m(m)` / `params::m()` are backed by `OnceLock<usize>` in production (write-once, panic on conflict) and by `thread_local! { Cell<usize> }` in test builds (per-thread, freely writable). `params::init(k, m)` sets both in one call.
|
||||
|
||||
## Encoding
|
||||
|
||||
`Kmer::from_ascii(ascii, k)` encodes the first k bytes of an ASCII slice using the shared `ENC` table (see [SuperKmer — ASCII encoding](superkmer.md#ascii-encoding-and-decoding)):
|
||||
`KmerOf::<L>::from_ascii(ascii)` encodes the first `L::len()` bytes using the shared `ENC` table (see [SuperKmer — ASCII encoding](superkmer.md#ascii-encoding-and-decoding)):
|
||||
|
||||
```rust
|
||||
for i in 0..k {
|
||||
val = (val << 2) | encode_base(ascii[i]) as u64;
|
||||
}
|
||||
Kmer(val << (64 - 2 * k))
|
||||
KmerOf(val << (64 - 2 * k), PhantomData)
|
||||
```
|
||||
|
||||
Zero allocation — result lives on the stack.
|
||||
|
||||
## Decoding
|
||||
|
||||
`write_ascii(k, buf)` appends k ASCII characters to a caller-supplied `Vec<u8>` using the shared `DEC4` table: one lookup per 4 nucleotides, two partial-byte lookups for the remainder. No allocation in the hot path.
|
||||
`write_ascii(writer)` writes k ASCII characters to any `W: Write` using the shared `DEC4` table: one lookup per 4 nucleotides, one partial lookup for the remainder. No allocation in the hot path.
|
||||
|
||||
`to_ascii(k)` is a convenience wrapper that allocates and returns a `Vec<u8>`; intended for tests and display only.
|
||||
`to_ascii()` is a convenience wrapper that allocates and returns a `Vec<u8>`; intended for tests and display only.
|
||||
|
||||
## Reverse complement
|
||||
|
||||
@@ -43,18 +62,30 @@ let x = !self.0; /
|
||||
let x = x.swap_bytes(); // reverse bytes
|
||||
let x = ((x >> 4) & 0x0F0F0F0F0F0F0F0F) | ((x & 0x0F0F0F0F0F0F0F0F) << 4); // swap nibbles
|
||||
let x = ((x >> 2) & 0x3333333333333333) | ((x & 0x3333333333333333) << 2); // swap 2-bit groups
|
||||
Kmer(x << (64 - 2 * k))
|
||||
KmerOf(x << (64 - 2 * k), PhantomData)
|
||||
```
|
||||
|
||||
After complementing, bytes are reversed (`swap_bytes`), then nibbles, then 2-bit groups — restoring 2-bit nucleotides to their correct positions in reverse order. A final left-shift realigns to MSB. Zero allocation — result lives on the stack.
|
||||
|
||||
## Canonical form
|
||||
## Canonical form and `CanonicalKmerOf`
|
||||
|
||||
`canonical()` returns a `CanonicalKmerOf<L>` — a distinct newtype that carries the same `u64` layout but enforces the invariant that the stored value equals `min(kmer, revcomp)`:
|
||||
|
||||
```rust
|
||||
pub fn canonical(&self, k: usize) -> Self {
|
||||
let rc = self.revcomp(k);
|
||||
if self.0 <= rc.0 { *self } else { rc }
|
||||
pub fn canonical(&self) -> CanonicalKmerOf<L> {
|
||||
let rc = self.revcomp();
|
||||
CanonicalKmerOf(if self.0 <= rc.0 { self.0 } else { rc.0 }, PhantomData)
|
||||
}
|
||||
```
|
||||
|
||||
Lexicographic minimum of forward and reverse-complement, comparing the raw `u64` values directly (left-aligned encoding makes this equivalent to nucleotide-wise comparison). Zero allocation — result lives on the stack.
|
||||
|
||||
`CanonicalKmerOf::from_raw_unchecked(raw)` is the only other public constructor, for trusted paths such as deserialisation.
|
||||
|
||||
## Sliding window helpers
|
||||
|
||||
`push_right(nuc)` / `push_left(nuc)` shift the window by one base in O(1). `is_overlapping(other)` checks whether the last k−1 nucleotides of `self` equal the first k−1 of `other`.
|
||||
|
||||
## Hashing
|
||||
|
||||
`hash_kmer(raw: u64) -> u64` computes `mix64(raw ^ 0x9e3779b97f4a7c15)`, the seeded splitmix64 finalizer. `CanonicalKmerOf::seq_hash()` delegates to `hash_kmer`.
|
||||
|
||||
@@ -26,13 +26,36 @@ Default mode is `Presence`. `Count` mode requires **all** source indexes to have
|
||||
All source indexes must satisfy:
|
||||
|
||||
- `IndexState::Indexed` (fully built — `index.done` sentinel present)
|
||||
- Same `kmer_size`, `minimizer_size`, `n_bits`
|
||||
- Same `kmer_size`, `minimizer_size`, `n_partitions`
|
||||
- Same evidence kind: all `Exact`, or all `Approx` with identical `(b, z)` parameters
|
||||
- If `Count` mode: all sources must have `with_counts=true`
|
||||
|
||||
`--force`: if the output directory already exists, it is deleted before the merge begins.
|
||||
|
||||
---
|
||||
|
||||
## Evidence compatibility
|
||||
|
||||
`validate_evidence_compat(sources)` is called before any I/O. It compares each source's `EvidenceKind` against `sources[0]`:
|
||||
|
||||
- All `Exact` → accepted, output uses `Exact`
|
||||
- All `Approx { b, z }` with same `(b, z)` → accepted, output uses those parameters
|
||||
- Any other combination → `OKIError::IncompatibleEvidence`, with a message directing the user to run `reindex` first
|
||||
|
||||
Mixed exact/approx is a hard error, not a silent conversion.
|
||||
|
||||
```rust
|
||||
fn validate_evidence_compat(sources: &[&KmerIndex]) -> OKIResult<EvidenceKind>
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Genome label deduplication
|
||||
|
||||
`compute_labels(sources, rename_duplicates)` assigns final genome labels across all sources before any file is written. The first occurrence of a label keeps the original name. Subsequent occurrences receive `.1`, `.2`, … suffixes when `rename_duplicates` is true, or trigger `OKIError::DuplicateGenomeLabel` otherwise.
|
||||
|
||||
---
|
||||
|
||||
## Algorithm
|
||||
|
||||
### 1. Validation
|
||||
@@ -41,36 +64,72 @@ Check all sources against the constraints above. Abort on any mismatch.
|
||||
|
||||
### 2. Bootstrap output from first source
|
||||
|
||||
Recursive file copy of `sources[0]` → `output`. This establishes the partition layout, all existing MPHFs, unitigs, and evidence files. The first source's genome is genome 0 in the output.
|
||||
Recursive file copy of `sources[0]` → `output`. Immediately after the copy:
|
||||
|
||||
- `index.meta` is rewritten with the final genome list (all sources, possibly renamed) and the effective evidence kind.
|
||||
- In `Presence` mode, any `counts/` directories inherited from source_0 are removed.
|
||||
- `spectrums/` from source_0 is removed and rebuilt from scratch across all sources, applying the (possibly renamed) labels.
|
||||
|
||||
This establishes the partition layout, all existing MPHFs, unitigs, and evidence files. The first source's genomes occupy columns 0 … `n_dst_genomes - 1` in the destination.
|
||||
|
||||
### 3. For each subsequent source (parallel across partitions)
|
||||
|
||||
For each partition, process one source at a time sequentially:
|
||||
`KmerPartition::merge_partition(i, sources, mode, n_dst_genomes, block_bits)` is called for each partition index `i`. `block_bits` is taken from `dst.meta.config.block_bits`.
|
||||
|
||||
**a. Classify kmers**
|
||||
Each entry in `sources` is `(&KmerPartition, n_genomes)` where `n_genomes` is the column count that source contributes (> 1 when the source is itself a merged index).
|
||||
|
||||
Iterate all kmers in the source partition (via `UnitigFileReader` + canonical kmer iteration). For each kmer, probe the destination `LayeredMap<()>`:
|
||||
**First merge, Presence mode**: when `n_dst_genomes == 1`, `Layer::<()>::init_presence_matrix` is called on every existing destination layer before any source column is appended. This creates `presence/col_000000.pbiv` set all-true (genome 0 is present in every slot).
|
||||
|
||||
- **Hit** `(dst_layer, slot)`: record `(dst_layer, slot, value)` where `value` is the source count (Count mode) or `1` (Presence mode).
|
||||
- **Miss**: add kmer to a `GraphDeBruijn` accumulator; record its value in a `HashMap<CanonicalKmer, Vec<u32>>`.
|
||||
**Pass 1 — classify kmers**
|
||||
|
||||
**b. Extend existing layers**
|
||||
Iterate all kmers from all source partitions (via `UnitigFileReader` + canonical kmer iteration). For each kmer, probe the destination `LayeredMap<()>`:
|
||||
|
||||
For each existing layer in the destination partition, call `append_genome_column` once per source genome being merged. Slots that received a hit are populated with their recorded value; all other slots receive 0 (absent).
|
||||
- **Hit**: kmer already in the destination; record for Pass 2.
|
||||
- **Miss**: push kmer into a `GraphDeBruijn` accumulator.
|
||||
|
||||
If this is the **first merge** and operating in Presence mode, call `Layer<()>::init_presence_matrix` before appending any source column. This creates `presence/` with `col_000000.pbiv` set all-true (genome 0 is present in every slot of this layer).
|
||||
**New layer construction**
|
||||
|
||||
**c. Build new layer for new kmers**
|
||||
If the accumulator is non-empty, compute de Bruijn unitigs and call `Layer::<()>::build(&new_layer_dir, block_bits)`. All kmers absent from the destination — across **all** sources — accumulate into a **single** graph, producing one new layer per merge operation (not one per source).
|
||||
|
||||
If the `GraphDeBruijn` accumulator is non-empty (misses exist), write compact unitigs from the de Bruijn graph, then call the appropriate `Layer::build` variant. Before appending the source's own column, prepend `n_existing_genomes` absent columns (value 0) — one per genome already in the index — so the column count invariant holds immediately after layer creation.
|
||||
**Pass 2 — fill column builders**
|
||||
|
||||
**d. Update partition metadata**
|
||||
For each source and each of its layers, re-iterate unitigs and look up stored values via `SrcLayerData::lookup(kmer, src_n)`:
|
||||
|
||||
Write updated `meta.json` with the incremented `n_layers` if a new layer was added.
|
||||
- `SrcLayerData::SetMembership` — no data directory exists; every kmer returns `vec![1; n_genomes]`
|
||||
- `SrcLayerData::Presence` — reads `PersistentBitMatrix` from `presence/`
|
||||
- `SrcLayerData::Count` — reads `PersistentCompactIntMatrix` from `counts/`
|
||||
|
||||
Hits are routed to `exist_builders[dst_layer][src_col]`; misses are routed to `new_src_builders[src_col]`.
|
||||
|
||||
**Column prepending for new layers**
|
||||
|
||||
Before source columns are written to the new layer, `n_dst_genomes` absent columns (all-zero / all-false) are prepended — one per genome already in the index — so the column count invariant holds immediately after layer creation.
|
||||
|
||||
**Close and update metadata**
|
||||
|
||||
Close all builders; update `presence/meta.json` or `counts/meta.json` with `{"n": N, "n_cols": n_dst_genomes + n_src_total}`; increment `PartitionMeta::n_layers` if a new layer was added.
|
||||
|
||||
### 4. Update index metadata
|
||||
|
||||
Append the merged source's genome label(s) to `index.meta.genomes`. Write updated `index.meta`.
|
||||
`index.meta` was already written during bootstrap with the complete genome list and evidence kind. No further update is needed after the partition loop.
|
||||
|
||||
---
|
||||
|
||||
## `append_genome_column`
|
||||
|
||||
Defined on two concrete specialisations of `Layer<D>`:
|
||||
|
||||
```rust
|
||||
impl Layer<PersistentCompactIntMatrix> {
|
||||
pub fn append_genome_column(layer_dir: &Path, value_of: impl Fn(usize) -> u32) -> OLMResult<()>
|
||||
}
|
||||
|
||||
impl Layer<PersistentBitMatrix> {
|
||||
pub fn append_genome_column(layer_dir: &Path, value_of: impl Fn(usize) -> bool) -> OLMResult<()>
|
||||
}
|
||||
```
|
||||
|
||||
Each appends one column file to the matrix subdirectory (`counts/` or `presence/`). In `merge_partition`, columns are written directly via `PersistentBitVecBuilder` / `PersistentCompactIntVecBuilder` rather than through these helpers, but the invariant they enforce is the same.
|
||||
|
||||
---
|
||||
|
||||
@@ -78,27 +137,31 @@ Append the merged source's genome label(s) to `index.meta.genomes`. Write update
|
||||
|
||||
After any merge, **every layer in every partition has exactly `n_genomes` columns**, where `n_genomes` is the total genome count in the index at that point.
|
||||
|
||||
This is maintained by three mechanisms:
|
||||
Maintained by three mechanisms:
|
||||
|
||||
1. **Existing layers**: one column appended per source genome (`append_genome_column`).
|
||||
2. **New layers created during merge**: `n_existing_genomes` absent columns prepended before the source's own column.
|
||||
3. **First merge, Presence mode**: `init_presence_matrix` retroactively creates `presence/col_0` all-true for genome 0 before any source column is appended.
|
||||
1. **Existing layers**: `n_src_total` columns appended (one per source genome).
|
||||
2. **New layers created during merge**: `n_dst_genomes` absent columns prepended before source columns.
|
||||
3. **First merge, Presence mode**: `init_presence_matrix` retroactively creates `presence/col_0` all-true for genome 0.
|
||||
|
||||
The invariant is a precondition of the `LayeredStore` aggregation traits: `col_weights()` and all partial distance methods assume every inner store has the same column count.
|
||||
The invariant is a precondition of `LayeredStore` aggregation traits: `col_weights()` and all partial distance methods assume every inner store has the same column count.
|
||||
|
||||
---
|
||||
|
||||
## New layer construction
|
||||
## Error variants relevant to merge
|
||||
|
||||
All kmers absent from the destination index — across **all** sources being merged — accumulate into a **single** `GraphDeBruijn` per partition. One new layer is built from this graph, not one layer per source. This keeps the layer count bounded and preserves compact unitig representation.
|
||||
|
||||
De Bruijn reconstruction ensures that overlapping k-1 suffixes/prefixes from different source kmers are merged into minimal unitigs before MPHF construction.
|
||||
| Variant | Condition |
|
||||
|---|---|
|
||||
| `OKIError::NotIndexed(path)` | Source not in `Indexed` state |
|
||||
| `OKIError::IncompatibleConfig` | Mismatched `kmer_size`, `minimizer_size`, or `n_partitions` |
|
||||
| `OKIError::MismatchedMode` | Count mode but a source has `with_counts=false` |
|
||||
| `OKIError::IncompatibleEvidence(msg)` | Mixed exact/approx or different approx `(b, z)` |
|
||||
| `OKIError::DuplicateGenomeLabel(label)` | Duplicate label and `rename_duplicates=false` |
|
||||
|
||||
---
|
||||
|
||||
## On-disk impact
|
||||
|
||||
After merging `G` genomes (1 existing + G-1 new sources):
|
||||
After merging `G` genomes (sources_0 contributes `G0`, subsequent sources the rest):
|
||||
|
||||
```
|
||||
partitions/
|
||||
@@ -106,28 +169,28 @@ partitions/
|
||||
index/
|
||||
meta.json ← n_layers updated if new layer added
|
||||
layer_0/
|
||||
mphf.bin ← unchanged (MPHF immutable)
|
||||
mphf.bin ← unchanged
|
||||
unitigs.bin ← unchanged
|
||||
evidence.bin ← unchanged
|
||||
presence/ ← created on first merge (Presence mode)
|
||||
meta.json {"n": N, "n_cols": G}
|
||||
col_000000.pbiv ← all-true (genome 0)
|
||||
col_000001.pbiv ← source 1 presence
|
||||
col_000000.pbiv ← all-true (genome 0 … G0-1)
|
||||
col_000001.pbiv ← next source
|
||||
...
|
||||
counts/ ← extended (Count mode)
|
||||
meta.json {"n": N, "n_cols": G}
|
||||
col_000000.pciv ← genome 0 counts (from original build)
|
||||
col_000001.pciv ← source 1 counts
|
||||
col_000001.pciv ← next source
|
||||
...
|
||||
layer_1/ ← new layer (if new kmers found)
|
||||
layer_N/ ← new layer (if new kmers found)
|
||||
mphf.bin
|
||||
unitigs.bin
|
||||
evidence.bin
|
||||
presence/ or counts/
|
||||
meta.json {"n": N1, "n_cols": G}
|
||||
col_000000.pbiv ← all-false (genome 0 absent from this layer)
|
||||
col_000000.pbiv ← all-false (absent for existing genomes)
|
||||
...
|
||||
col_000001.pbiv ← source 1 presence in this new layer
|
||||
spectrums/
|
||||
<label>.json ← one file per genome, rebuilt from all sources
|
||||
index.meta ← complete genome list + evidence kind written at bootstrap
|
||||
```
|
||||
|
||||
The `index.meta` genome list grows from 1 entry to G entries after all sources are merged.
|
||||
|
||||
@@ -27,10 +27,10 @@ part_XXXXX/
|
||||
|
||||
After filtering (applying a min-count threshold derived from the spectrum) and building the local De Bruijn graph + unitigs (see [Construction pipeline](pipeline.md)), the exact filtered kmer set is available via `unitigs.bin`.
|
||||
|
||||
`MphfLayer::build` is called on the unitig file:
|
||||
`MphfLayer::build(dir, block_bits, fill_slot)` is called on the unitig directory:
|
||||
|
||||
1. **Pass 1**: iterate all canonical kmers from `unitigs.bin` in parallel, build and store `mphf.bin` (ptr_hash).
|
||||
2. **Pass 2**: iterate sequentially, fill `evidence.bin`, call the mode-specific `fill_slot` callback.
|
||||
1. **Pass 1**: build `.idx` via `build_unitig_idx(unitig_path, block_bits)`, then iterate all canonical kmers in parallel over chunks using `(0..unitigs.len()).into_par_iter()` + `unitigs.unitig(ci).into_canonical_kmers()`. Constructs and stores `mphf.bin` (ptr_hash, `new_from_par_iter`).
|
||||
2. **Pass 2**: iterate sequentially with `iter_indexed_canonical_kmers`; fill `evidence.bin`; call `fill_slot(slot, kmer)` callback once per kmer for DataStore population.
|
||||
|
||||
`mphf1.bin` and `counts1.bin` are no longer needed after phase 2 and can be deleted.
|
||||
|
||||
@@ -105,9 +105,12 @@ Each layer is a self-contained unit. See [obilayeredmap](obilayeredmap.md) for t
|
||||
|
||||
```
|
||||
layer_i/
|
||||
unitigs.bin — packed 2-bit nucleotide sequences (kmer evidence)
|
||||
unitigs.bin — packed 2-bit nucleotide sequences (kmer evidence source)
|
||||
unitigs.bin.idx — random-access block index (block_bits controls granularity)
|
||||
mphf.bin — ptr_hash phase-2 MPHF
|
||||
evidence.bin — n × u32: (chunk_id: 25 bits | rank: 7 bits) per slot
|
||||
evidence.bin — n × (chunk_id: 25 bits | rank: 7 bits) per slot [exact mode]
|
||||
fingerprint.bin — n × b-bit fingerprints per slot [approx mode]
|
||||
layer_meta.json — evidence kind, recorded at build time
|
||||
```
|
||||
|
||||
Layers are **disjoint**: a canonical kmer belongs to exactly one layer. Layer 0 is built from dataset A. Adding dataset B:
|
||||
@@ -116,9 +119,45 @@ Layers are **disjoint**: a canonical kmer belongs to exactly one layer. Layer 0
|
||||
2. Collect kmers of B not present in any layer → set `B \ A`.
|
||||
3. Build layer 1 from `B \ A` (dereplicate → count → De Bruijn → unitigs → `MphfLayer::build`).
|
||||
|
||||
### Evidence modes
|
||||
|
||||
Two evidence modes are supported, selected at build time via `EvidenceKind` and recorded in `layer_meta.json`.
|
||||
|
||||
**Exact** (`EvidenceKind::Exact`): `evidence.bin` stores one `(chunk_id, rank)` pair per MPHF slot, encoding the position of the corresponding kmer in `unitigs.bin`. Membership verification reconstructs the kmer from `(chunk_id, rank)` and compares it to the query. Zero false positives. Requires `.idx` for random access.
|
||||
|
||||
**Approx** (`EvidenceKind::Approx { b, z }`): `fingerprint.bin` stores a `b`-bit hash of the kmer at each MPHF slot. Membership check compares `kmer.seq_hash()` against the stored fingerprint. False-positive rate: 1/2^b per query. No `.idx` is written or needed.
|
||||
|
||||
### Build functions
|
||||
|
||||
```
|
||||
MphfLayer::build(dir, block_bits, fill_slot)
|
||||
Pass 1: par_iter over chunks via .idx → build mphf.bin
|
||||
Pass 2: sequential iter → fill evidence.bin + call fill_slot
|
||||
|
||||
MphfLayer::build_exact_evidence(dir, block_bits)
|
||||
Standalone post-hoc: builds evidence.bin + .idx from existing mphf.bin + unitigs.bin
|
||||
Uses open_sequential(); no .idx required on entry
|
||||
|
||||
MphfLayer::build_approx_evidence(dir, b, z)
|
||||
Standalone post-hoc: builds fingerprint.bin from existing mphf.bin + unitigs.bin
|
||||
Uses open_sequential(); never writes .idx
|
||||
|
||||
MphfLayer::build_evidence(dir, kind, block_bits)
|
||||
Dispatch wrapper: routes to build_exact_evidence or build_approx_evidence
|
||||
```
|
||||
|
||||
`build` always produces exact evidence. If approximate evidence is needed (e.g. `EvidenceKind::Approx`), the caller invokes `build_approx_evidence` after `build` to replace the evidence bundle.
|
||||
|
||||
In `obikpartitionner`, `build_index_layer` receives `block_bits: u8` from `IndexConfig::block_bits` and forwards it directly to `Layer::build` and `Layer::build_approx_evidence`.
|
||||
|
||||
### Membership verification
|
||||
|
||||
ptr_hash maps any input to a valid slot — it does not natively detect absent keys. Membership is verified using the evidence entry: decode the kmer from `(chunk_id, rank)` and compare to the query. A mismatch means the kmer is absent from this layer; probe the next layer.
|
||||
ptr_hash maps any input to a valid slot — it does not natively detect absent keys. Membership is verified using the evidence entry:
|
||||
|
||||
- **Exact**: decode `(chunk_id, rank)` from `evidence.bin`; reconstruct the kmer via `unitigs.verify_canonical_kmer`; compare to query.
|
||||
- **Approx**: compare `kmer.seq_hash()` to the b-bit fingerprint stored at the slot.
|
||||
|
||||
A mismatch in either mode means the kmer is absent from this layer; probe the next layer.
|
||||
|
||||
### Query algorithm
|
||||
|
||||
@@ -126,12 +165,12 @@ ptr_hash maps any input to a valid slot — it does not natively detect absent k
|
||||
fn query(kmer) → Option<(layer_index, slot)>:
|
||||
for (i, layer) in layers.iter().enumerate():
|
||||
slot = layer.mphf.index(kmer)
|
||||
if layer.evidence.decode(slot) matches kmer:
|
||||
if layer.evidence.matches(slot, kmer): // exact or approx dispatch
|
||||
return Some((i, slot))
|
||||
return None
|
||||
```
|
||||
|
||||
Expected probe depth: 1 for kmers in layer 0. Each probe is a ptr_hash lookup (~10 ns) plus one evidence decode.
|
||||
`MphfLayer::find` dispatches transparently to `find_exact` or `find_approx` based on the evidence loaded at `open` time. Expected probe depth: 1 for kmers in layer 0. Each probe is a ptr_hash lookup (~10 ns) plus one evidence check.
|
||||
|
||||
### Merging layers
|
||||
|
||||
|
||||
@@ -1,16 +1,26 @@
|
||||
# obipipeline — parallel pipeline library
|
||||
|
||||
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **transforms**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
|
||||
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **stages**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
|
||||
|
||||
## Core types
|
||||
|
||||
| Type alias | Rust type | Role |
|
||||
|---|---|---|
|
||||
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send+Sync>` | Called repeatedly; `FnMut` because it holds iterator state |
|
||||
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send+Sync>` | Shared across workers via `Arc::clone` (no copy of the closure) |
|
||||
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send+Sync>` | Final consumer; returns `Result` so errors propagate back |
|
||||
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send>` | Called repeatedly; `FnMut` because it holds iterator state |
|
||||
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send + Sync>` | 1→1 transform shared across workers via `Arc::clone` |
|
||||
| `SharedFlatFn<D>` | `Arc<dyn Fn(D, &Sender<Result<D, _>>, &Sender<isize>) + Send + Sync>` | 1→N transform; pushes items into channel, sends delta |
|
||||
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send>` | Final consumer; returns `Result` so errors propagate back |
|
||||
|
||||
`Pipeline<D>` holds one `SourceFn`, a `Vec<SharedFn>`, and one `SinkFn`.
|
||||
Stages come in two variants:
|
||||
|
||||
```rust
|
||||
pub enum Stage<D> {
|
||||
Transform(SharedFn<D>), // 1→1
|
||||
Flat(SharedFlatFn<D>), // 1→N
|
||||
}
|
||||
```
|
||||
|
||||
`Pipeline<D>` holds one `SourceFn`, a `Vec<Stage>`, and one `SinkFn`.
|
||||
`WorkerPool<D>` wraps a `Pipeline` with `n_workers` and channel `capacity`.
|
||||
|
||||
## WorkerPool
|
||||
@@ -23,7 +33,7 @@ WorkerPool::run(self)
|
||||
| Parameter | Role |
|
||||
|---|---|
|
||||
| `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. |
|
||||
| `capacity` | Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
|
||||
| `capacity` | Bound on every crossbeam channel in the pipeline. Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
|
||||
|
||||
`run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.
|
||||
|
||||
@@ -43,7 +53,7 @@ Each variant carries the concrete type for one stage's output. The macros patter
|
||||
|
||||
## Macros
|
||||
|
||||
Six low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
|
||||
Eight low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
|
||||
|
||||
### Low-level
|
||||
|
||||
@@ -54,6 +64,9 @@ make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T
|
||||
make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U
|
||||
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>
|
||||
|
||||
make_flat_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> impl IntoIterator<Item=U>
|
||||
make_flat_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<impl IntoIterator<Item=U>, E>
|
||||
|
||||
make_sink!(Enum, func, InputVariant) // func: T -> ()
|
||||
make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E>
|
||||
```
|
||||
@@ -66,8 +79,10 @@ Each macro wraps the closure in the correct smart pointer (`Box` for source/sink
|
||||
make_pipeline! {
|
||||
DataEnum,
|
||||
source iterator => OutputVariant, // or source? for fallible
|
||||
| func: In => Out, // non-fallible transform
|
||||
|? func: In => Out, // fallible transform
|
||||
| func: In => Out, // 1→1 non-fallible transform
|
||||
|? func: In => Out, // 1→1 fallible transform
|
||||
|| func: In => Out, // 1→N non-fallible flat transform
|
||||
||? func: In => Out, // 1→N fallible flat transform
|
||||
sink func @ InputVariant, // or sink? for fallible
|
||||
}
|
||||
```
|
||||
@@ -75,12 +90,29 @@ make_pipeline! {
|
||||
`?` marks fallibility on source, individual transforms, or sink independently.
|
||||
Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`.
|
||||
|
||||
### make_pipe! DSL
|
||||
|
||||
`make_pipe!` builds a sourceless/sinkless `Pipe<D, In, Out>` — a reusable, composable stage sequence:
|
||||
|
||||
```
|
||||
make_pipe! {
|
||||
DataEnum : InType => OutType,
|
||||
| func: InVariant => OutVariant,
|
||||
|? func: InVariant => OutVariant,
|
||||
|| func: InVariant => OutVariant,
|
||||
||? func: InVariant => OutVariant,
|
||||
}
|
||||
```
|
||||
|
||||
Two pipes compose with `.then(other)`. Apply to an iterator with `.apply(iter, n_workers, capacity)` to get a `PipeIter<Out>` — an iterator over the pipeline output, backed by a background `WorkerPool`. The scatter step in `obikmer` uses `make_pipe!` and `.apply()` rather than the full `make_pipeline!` / `WorkerPool` pattern.
|
||||
|
||||
## Scheduler architecture
|
||||
|
||||
```
|
||||
Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
|
||||
▲ │
|
||||
[stage_rxs] ────────┘◄──────────────────────────────┘
|
||||
[flat_delta_rx] ──► Scheduler (in_flight adjustment)
|
||||
│
|
||||
[sink_err_rx] ← errors from sink (highest priority)
|
||||
│
|
||||
@@ -91,23 +123,23 @@ The scheduler is a single thread running a biased `Select` over all input channe
|
||||
|
||||
```
|
||||
index 0 sink_err_rx abort on sink error
|
||||
index 1 stage_rxs[N-1] drain last stage first
|
||||
...
|
||||
index N stage_rxs[0]
|
||||
index N+1 source_rx pull new data last
|
||||
index 1 flat_delta_rx adjust in_flight before dispatching
|
||||
index 2..=n+1 stage_rxs[n-1..0] drain last stage first
|
||||
index n+2 source_rx pull new data last
|
||||
```
|
||||
|
||||
This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.
|
||||
|
||||
**Workers** are generic: each receives `(data, SharedFn, result_tx)` and calls `f(data)`, sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result.
|
||||
**Workers** are generic: each receives a `WorkerTask` — either `Transform(data, stage_idx)` or `Flat(data, stage_idx)`. For `Transform`, the worker calls `f(data)` and sends the result to `stage_txs[stage_idx]`. For `Flat`, the worker calls `f(data, &push_tx, &delta_tx)`: the closure pushes N items into `push_tx` then sends `N-1` to `delta_tx`. The scheduler uses the delta to adjust `in_flight` without knowing N in advance.
|
||||
|
||||
**Termination** uses an `in_flight` counter:
|
||||
**Termination** uses an `in_flight: isize` counter and a `flat_workers_active: usize` counter:
|
||||
|
||||
- incremented when an item is dispatched from source to workers
|
||||
- decremented when the item exits the last stage
|
||||
- the loop exits only when `source_done && in_flight == 0`
|
||||
- `in_flight` incremented when an item is dispatched from source to workers
|
||||
- `in_flight` decremented when the item exits the last stage to the sink
|
||||
- `flat_workers_active` incremented when a `Flat` task is dispatched, decremented when the delta arrives
|
||||
- the loop exits only when `source_done && in_flight == 0 && flat_workers_active == 0`
|
||||
|
||||
This guarantees all in-flight items complete before `join()`.
|
||||
This guarantees all in-flight items complete (including all N outputs of a flat stage) before `join()`.
|
||||
|
||||
## Error handling
|
||||
|
||||
@@ -118,7 +150,7 @@ This guarantees all in-flight items complete before `join()`.
|
||||
| `EndOfStream` | Source exhausted (normal termination, not sent downstream) |
|
||||
| `TypeMismatch` | Wrong enum variant arrived at a stage |
|
||||
| `StepKindMismatch` | Internal routing error |
|
||||
| `StepError(Box<dyn Error>)` | Error from user code (wrapped by `make_*_fallible!`) |
|
||||
| `StepError(Box<dyn Error + Send + Sync>)` | Error from user code (wrapped by `make_*_fallible!`) |
|
||||
|
||||
Sink errors flow back to the scheduler via a dedicated `Receiver<PipelineError>` registered at index 0 of the Select — the pipeline stops immediately on the first sink error.
|
||||
|
||||
|
||||
@@ -88,9 +88,7 @@ Implemented as a three-step pipeline in `count_partition()`:
|
||||
2. **Provisional MPHF** (ptr_hash): built from `sorted_unique.bin` via `new_from_par_iter(f0, ...)`. Stored to `mphf1.bin`; `sorted_unique.bin` deleted immediately.
|
||||
3. **Accumulation pass**: re-read dereplicated superkmers; for each kmer, `slot = mphf.index(kmer.raw())`, increment `counts1[slot]` by the superkmer COUNT. Stored in a `PersistentCompactIntVec` (`counts1.bin`).
|
||||
|
||||
At the end of this phase, each distinct canonical kmer has its exact total count, and the frequency spectrum (`kmer_spectrum_raw.json`) is written.
|
||||
|
||||
Abundance filter applied here: kmers with `total_count < q` are discarded. `q` is a collection parameter (0 = keep all, including singletons for ≤1x data).
|
||||
At the end of this phase, each distinct canonical kmer has its exact total count, and the frequency spectrum (`spectrums/{label}.json`) is written to the index root.
|
||||
|
||||
No pre-filter on super-kmer COUNT is possible at phase 2: a super-kmer with COUNT=1 may contain only high-abundance kmers, each present in many other super-kmers across the partition.
|
||||
|
||||
@@ -140,19 +138,70 @@ Output: `unitigs.bin` — the permanent evidence structure for the partition. Ea
|
||||
|
||||
## Phase 6 — MPHF construction and index finalisation
|
||||
|
||||
Built once on the definitive kmer set (all kmers in all unitigs of the partition). See [obilayeredmap](obilayeredmap.md) and [MPHF selection](mphf.md) for the current implementation.
|
||||
`build_index_layer` is called per partition (in parallel via `build_layers`) with the following parameters sourced from `IndexConfig`:
|
||||
|
||||
- `block_bits` — from `IndexConfig::block_bits`; controls the `.idx` block size (2^block_bits unitig chunks per block) for exact evidence
|
||||
- `evidence` — `EvidenceKind::Exact` or `EvidenceKind::Approx { b, z }`; propagated unchanged from `IndexConfig::evidence`
|
||||
- `min_ab` / `max_ab` — abundance bounds applied before graph construction
|
||||
- `with_counts` — whether to store kmer counts alongside set membership
|
||||
|
||||
**Abundance filtering:** when `min_ab > 1` or `max_ab.is_some()`, the provisional `mphf1.bin` and `counts1.bin` produced in phase 3 are memory-mapped. Each canonical kmer is accepted only if its count in `counts1` satisfies the bounds. If either file is absent, filtering is skipped (all kmers accepted).
|
||||
|
||||
```
|
||||
kmers from unitigs → MPHF → mphf.bin
|
||||
→ evidence.bin : n × u32, each = (chunk_id: 25 bits | rank: 7 bits)
|
||||
→ payload : counts/ (mode 2) or presence/ (mode 3)
|
||||
for each kmer in dereplicated super-kmer:
|
||||
ab = counts1[mphf1.index(kmer.raw())]
|
||||
if ab < min_ab || ab > max_ab: skip
|
||||
graph.push(kmer)
|
||||
```
|
||||
|
||||
The MPHF is built in two passes over `unitigs.bin`: parallel pass for `mphf.bin`, sequential pass for `evidence.bin` and payload. The exact kmer count is available from the unitig index (`unitigs.bin.idx`) before the passes begin.
|
||||
**Graph build and unitig write:**
|
||||
|
||||
**Exact verification via unitig evidence:**
|
||||
The surviving kmers are fed into `GraphDeBruijn`, which computes degrees and yields unitigs. Unitigs are written to `layer_0/unitigs.bin` via a `UnitigFileWriter`.
|
||||
|
||||
`unitigs.bin` serves as the evidence structure. The MPHF maps every input to `[0, N)` including absent kmers — the unitig read-back (via `evidence.bin`) is the only correct membership test.
|
||||
**MPHF and evidence build:**
|
||||
|
||||
`Layer::build` (membership-only) or `Layer::<PersistentCompactIntMatrix>::build` (with counts) is called next. Internally, `MphfLayer::build` performs two passes:
|
||||
|
||||
1. **Pass 1 (parallel):** build `unitigs.bin.idx` (block size = 2^`block_bits`) then construct the MPHF from all canonical kmers in `unitigs.bin`; store to `mphf.bin`.
|
||||
2. **Pass 2 (sequential):** for each kmer in `unitigs.bin`, compute its slot and write `evidence.bin` (`chunk_id: 25 bits | rank: 7 bits` packed into a `u32`); also invoke the payload callback (`fill_slot`) to populate `counts/` if `with_counts`.
|
||||
|
||||
After `Layer::build` completes, `layer_meta.json` records `EvidenceKind::Exact`.
|
||||
|
||||
**Approximate evidence override:**
|
||||
|
||||
If `evidence` is `EvidenceKind::Approx { b, z }`, `build_approx_evidence` is called immediately after `Layer::build`. It overwrites the exact evidence bundle with `fingerprint.bin` (b-bit hash per slot) and rewrites `layer_meta.json` with `EvidenceKind::Approx { b, z }`. No `.idx` file is needed at query time in this mode.
|
||||
|
||||
```
|
||||
// Exact path → evidence.bin + unitigs.bin.idx + layer_meta.json(Exact)
|
||||
// Approx path → fingerprint.bin + layer_meta.json(Approx{b,z})
|
||||
// (evidence.bin left on disk but not used)
|
||||
```
|
||||
|
||||
**Partition metadata:**
|
||||
|
||||
After all layer files are written, `PartitionMeta { n_layers: 1 }` is serialised to `index/meta.json` inside the partition directory. This file is required by `LayeredMap::open` for subsequent merge operations.
|
||||
|
||||
**File layout per partition after phase 6:**
|
||||
|
||||
```
|
||||
part_XXXXX/
|
||||
index/
|
||||
meta.json ← PartitionMeta { n_layers: 1 }
|
||||
layer_0/
|
||||
unitigs.bin ← permanent evidence (all modes)
|
||||
unitigs.bin.idx ← block index (exact mode only)
|
||||
mphf.bin ← MPHF
|
||||
evidence.bin ← exact evidence (exact mode)
|
||||
fingerprint.bin ← b-bit fingerprints (approx mode)
|
||||
layer_meta.json ← EvidenceKind tag
|
||||
counts/ ← PersistentCompactIntMatrix (with_counts only)
|
||||
```
|
||||
|
||||
**Cleanup:** unless `--keep-intermediate` is set, `remove_build_artifacts` deletes `dereplicated.skmer.zst`, `mphf1.bin`, and `counts1.bin` after all partitions are indexed.
|
||||
|
||||
See [obilayeredmap](obilayeredmap.md) and [MPHF selection](mphf.md) for data structure details.
|
||||
|
||||
**Query path (exact evidence):**
|
||||
|
||||
```
|
||||
query kmer q
|
||||
@@ -164,4 +213,12 @@ query kmer q
|
||||
→ no match: kmer absent ← MPHF collision on absent kmer
|
||||
```
|
||||
|
||||
`superkmers.bin.gz` is no longer needed at this point and can be deleted.
|
||||
**Query path (approximate evidence):**
|
||||
|
||||
```
|
||||
query kmer q
|
||||
→ MPHF(q) → slot s
|
||||
→ fingerprint[s] matches seq_hash(q)?
|
||||
→ yes : probable hit (FP rate = 1/2^b per kmer, 1/2^(b·z) per z-window)
|
||||
→ no : kmer absent
|
||||
```
|
||||
|
||||
@@ -2,36 +2,47 @@
|
||||
|
||||
## Memory layout
|
||||
|
||||
A super-kmer is stored as a **32-bit header** followed by a **byte-aligned nucleotide sequence** (2 bits/base, nucleotide 0 at the MSB of the first byte):
|
||||
|
||||
| Field | Bits | Role |
|
||||
|-------|------|------|
|
||||
| COUNT | 24 | Occurrence count (≤ 16 M) |
|
||||
| NKMERS | 8 | Number of kmers (= seq_length − k + 1, range 1–255) |
|
||||
|
||||
Bit layout (MSB to LSB): `[31:8] COUNT [7:0] NKMERS`
|
||||
|
||||
NKMERS is stored as a raw `u8` in **kmer units**, not nucleotides. The nucleotide length is recovered as `NKMERS + k − 1`. This avoids the awkward wrapping convention (`0 = 256`) that would be needed if nucleotide length were stored directly, and gains k−1 = 30 units of headroom:
|
||||
|
||||
| unit | u8 covers | max nucleotides |
|
||||
|---|---|---|
|
||||
| nucleotides | 255 nt | 225 kmers |
|
||||
| **kmers** | **255 kmers** | **285 nt** |
|
||||
|
||||
The public accessors:
|
||||
`SuperKmer` holds two separate fields:
|
||||
|
||||
```rust
|
||||
fn n_kmers(&self) -> usize { (self.0 & 0xFF) as usize }
|
||||
fn seql(&self) -> usize { self.n_kmers() + K - 1 }
|
||||
fn count(&self) -> u32 { self.0 >> 8 }
|
||||
fn increment(&mut self) { self.0 += 1 << 8; }
|
||||
fn add(&mut self, n: u32) { self.0 += n << 8; }
|
||||
fn set_count(&mut self, n: u32) { self.0 = (self.0 & 0xFF) | (n << 8); }
|
||||
pub struct SuperKmer {
|
||||
pub(crate) count: u32,
|
||||
pub(crate) inner: PackedSeq,
|
||||
}
|
||||
```
|
||||
|
||||
In practice, observed super-kmer lengths on metagenomic data (k=31) are below 55 nucleotides (≤ 25 kmers) — far from the 255-kmer cap. If a super-kmer ever exceeds 255 kmers, it is split with a k−1 nucleotide overlap, preserving all kmers without duplication (identical mechanism to partition-boundary splits).
|
||||
`PackedSeq` stores a 2-bit packed DNA sequence as a heap-allocated `Box<[u8]>` plus a `tail: u8` field:
|
||||
|
||||
The sequence is always stored in canonical form (lexicographic minimum of forward and reverse complement), with nucleotide 0 at the MSB of the first byte. The byte array can be hashed directly without any adjustment.
|
||||
| Field | Type | Role |
|
||||
|-------|------|------|
|
||||
| `tail` | `u8` | Number of valid nucleotides in the last byte: 0 encodes 4, 1–3 are identity |
|
||||
| `seq` | `Box<[u8]>` | 2-bit packed bytes, nucleotide 0 at bits 7–6 of `seq[0]` |
|
||||
|
||||
Nucleotide length is recovered without storing it explicitly:
|
||||
|
||||
```text
|
||||
seql = (seq.len() - 1) * 4 + tail_count(tail)
|
||||
```
|
||||
|
||||
There is no packed header word — `count` and the sequence live in separate fields.
|
||||
|
||||
The on-disk binary format (produced by `write_to_binary`) is:
|
||||
|
||||
```text
|
||||
[varint(count)] [u8: seql − k] [packed bytes…]
|
||||
```
|
||||
|
||||
`seql − k` fits in a `u8` when `n_kmers = seql − k + 1 ≤ MAX_KMERS_PER_CHUNK (= 256)`. If a super-kmer exceeds 256 kmers, `write_to_binary` splits it into overlapping chunks (k−1 nucleotide overlap, same count per chunk), each a self-contained record readable by `read_from_binary`.
|
||||
|
||||
The public accessors operate on the struct fields directly:
|
||||
|
||||
```rust
|
||||
fn seql(&self) -> usize { self.inner.seql() }
|
||||
fn count(&self) -> u32 { self.count }
|
||||
fn increment(&mut self) { self.count += 1; }
|
||||
fn add(&mut self, n: u32) { self.count += n; }
|
||||
fn set_count(&mut self, n: u32) { self.count = n; }
|
||||
```
|
||||
|
||||
## ASCII encoding and decoding
|
||||
|
||||
@@ -72,7 +83,7 @@ const fn revcomp4(x: u8) -> u8 {
|
||||
**Step 2 — realignment.** After step 1, `padding = n × 8 − seql × 2` spurious bits (complements of the original padding A's) appear at the start of the array. They are flushed left using `BitSlice<u8, Msb0>::rotate_left(padding)` from the `bitvec` crate, which is SIMD-accelerated. The trailing `padding` bits are then zeroed:
|
||||
|
||||
```rust
|
||||
let seql = self.n_kmers() + k - 1;
|
||||
let seql = self.seql();
|
||||
shift = n * 8 - seql * 2 // number of padding bits
|
||||
bits.rotate_left(shift)
|
||||
bits[len - shift..].fill(false)
|
||||
@@ -93,7 +104,7 @@ bits[len - shift..].fill(false)
|
||||
|
||||
## Minimizer sliding window
|
||||
|
||||
Super-kmers are built by `SuperKmerIter` (crate `obiskbuilder`), which maintains the current minimizer with a **monotonic deque** over a sliding window of W = k − m + 1 m-mer positions.
|
||||
Super-kmers are built by `SuperKmerIter` (crate `obiskbuilder`), which tracks the current minimizer with a **monotonic deque** (`Ring<MmerItem, 32>`) inside `RollingStat`, a rolling-window entropy and minimizer tracker.
|
||||
|
||||
Each deque entry stores:
|
||||
|
||||
@@ -101,20 +112,9 @@ Each deque entry stores:
|
||||
|------------|-------|----------------------------------------------|
|
||||
| `position` | usize | 0-based start of this m-mer in the segment |
|
||||
| `canonical`| u64 | right-aligned canonical m-mer value (lex-min of fwd and rc); used as partition key |
|
||||
| `hash` | u64 | $H(\text{canonical})$ — ordering key for random minimizer selection |
|
||||
| `hash` | u64 | `hash_kmer(canonical << (64 − 2m))` — ordering key for random minimizer selection |
|
||||
|
||||
The hash $H$ is the seeded splitmix64 finalizer (see [Minimizer selection](../theory/minimizer.md)):
|
||||
|
||||
```rust
|
||||
fn hash_mmer(canonical: u64) -> u64 {
|
||||
let x = canonical ^ 0x9e3779b97f4a7c15; // seed: eliminates fixed point at 0
|
||||
let x = x ^ (x >> 30);
|
||||
let x = x.wrapping_mul(0xbf58476d1ce4e5b9);
|
||||
let x = x ^ (x >> 27);
|
||||
let x = x.wrapping_mul(0x94d049bb133111eb);
|
||||
x ^ (x >> 31)
|
||||
}
|
||||
```
|
||||
The hash uses the seeded splitmix64 finalizer (`mix64(raw ^ 0x9e3779b97f4a7c15)`), the same function as `kmer::hash_kmer`.
|
||||
|
||||
On each new nucleotide, once the window is full, the deque is updated:
|
||||
|
||||
@@ -133,24 +133,27 @@ On each new nucleotide, once the window is full, the deque is updated:
|
||||
|
||||
The front of the deque is always the current minimizer. Because the deque is maintained in strictly increasing hash order, each entry is popped at most once — O(1) amortized per nucleotide.
|
||||
|
||||
A super-kmer boundary is emitted when the minimizer changes: `deque.front.hash ≠ prev_hash`. The `canonical` field of the front entry is **not** used for boundary detection — that uses the hash alone. The canonical value is stored so that the partition key $H(\text{canonical})$ can be recomputed independently at routing time from the stored `minimizer_pos`, without inheriting the minimum-order-statistic bias (see [Minimizer selection — partition key independence](../theory/minimizer.md#partition-key-independence)).
|
||||
A super-kmer boundary is emitted when the minimizer changes: `current_minimizer != prev_minimizer`. `SuperKmerIter` also emits a boundary when:
|
||||
|
||||
- entropy of the current k-mer falls at or below the threshold θ (cursor retreated by k−1)
|
||||
- super-kmer length reaches 256 nucleotides (cursor retreated by k)
|
||||
|
||||
## Kmer extraction
|
||||
|
||||
A k-mer is extracted from a super-kmer with `SuperKmer::kmer(i, k)`, which returns a `Kmer` — a left-aligned `u64` newtype (see [Kmer implementation](kmer.md)):
|
||||
A k-mer is extracted from a super-kmer with `SuperKmer::kmer(i)`, which delegates to `PackedSeq::extract::<KLen>(i)` and returns a `Kmer` — a left-aligned `u64` newtype (see [Kmer implementation](kmer.md)):
|
||||
|
||||
```rust
|
||||
pub fn kmer(&self, i: usize, k: usize) -> Result<Kmer, KmerError>
|
||||
pub fn kmer(&self, i: usize) -> Result<Kmer, KmerError>
|
||||
```
|
||||
|
||||
The bit slice `seq[i*2 .. (i+k)*2]` (Msb0 order) is loaded as a big-endian `u64` via `bitvec::load_be`, then left-shifted to produce the canonical left-aligned layout. One call — no loop, no allocation.
|
||||
The bit slice `seq[i*2 .. (i+k)*2]` (Msb0 order) is loaded as a `u64` via `bitvec::load_be`, then left-shifted to produce the canonical left-aligned layout. One call — no loop, no allocation.
|
||||
|
||||
---
|
||||
|
||||
!!! abstract "Algorithm — Super-kmer reverse complement"
|
||||
```text
|
||||
procedure SuperKmerRevcomp(seq, SEQL):
|
||||
seql ← NKMERS + k − 1 -- nucleotide length
|
||||
seql ← nucleotide length
|
||||
n ← ⌈seql / 4⌉ -- number of bytes
|
||||
shift ← n × 8 − seql × 2 -- padding bits to flush
|
||||
|
||||
|
||||
@@ -174,7 +174,8 @@ Evidence cost decreases by 1 bit/kmer with each doubling of partition count (via
|
||||
|
||||
---
|
||||
|
||||
## Open questions
|
||||
## Alternative: fingerprint evidence
|
||||
|
||||
- **Cross-partition set operations**: strategy B allows unitig-level operations (mark entire chunks present/absent) rather than kmer-level, reducing cost by a factor of m_u.
|
||||
- **Eliminating evidence.bin**: at ~66% of per-layer lookup footprint, `evidence.bin` dominates index size. See [Evidence elimination design discussion](evidence_elimination.md).
|
||||
`evidence.bin` can be replaced by `fingerprint.bin` at index build time (`--approx`) or after the fact (`reindex --approx`). The fingerprint stores b bits per MPHF slot (the low b bits of `kmer.seq_hash()`); verification becomes a single bitfield comparison instead of a unitig dereference. False-positive rate per k-mer query: 1/2^b. With the Findere z parameter, z consecutive k-mers must all match, reducing the effective window FP rate to 1/2^(b·z) while skipping z−1 of every z k-mers. No `.idx` file is written or read in approx mode.
|
||||
|
||||
See [Approximate evidence (Findere fingerprint)](evidence_elimination.md) for the full design and CLI parameters.
|
||||
|
||||
@@ -2,10 +2,28 @@
|
||||
|
||||
`obikmer` is a Rust tool for manipulation, counting, indexing, and set operations on DNA sequences represented as kmer sets.
|
||||
|
||||
## Subcommands
|
||||
|
||||
| Subcommand | Purpose |
|
||||
|-------------|---------|
|
||||
| `superkmer` | Extract super-kmers from a sequence file and write to stdout |
|
||||
| `index` | Build a complete genome index (scatter → dereplicate → count → layered MPHF) |
|
||||
| `merge` | Merge multiple built indexes into one |
|
||||
| `rebuild` | Filter and compact an existing index into a new single-layer index |
|
||||
| `query` | Query an index with sequences and annotate matches |
|
||||
| `dump` | Dump all indexed kmers as CSV (kmer + per-genome counts or presence) |
|
||||
| `annotate` | Add or update genome metadata from a CSV file; or dump metadata as CSV |
|
||||
| `distance` | Compute pairwise distance matrix between genomes; optionally build NJ/UPGMA trees |
|
||||
| `unitig` | Dump unitigs from a built index to stdout (debug) |
|
||||
| `estimate` | Estimate approximate-index parameters (z, evidence bits, FP rates) before indexing |
|
||||
| `reindex` | Convert an index's evidence in-place: exact ↔ approx |
|
||||
|
||||
## Constraints
|
||||
|
||||
- Target scale: individual genome datasets, tens of Gbases
|
||||
- Maximum efficiency in computation, memory, and disk usage
|
||||
- k odd, k ∈ [11, 31], fixed at runtime; kmer fits in a u64 (2 bits/base)
|
||||
- Canonical form: `min(kmer, revcomp(kmer))` reduces strand-symmetric space by half
|
||||
- Input formats: FASTA, FASTQ, gzip, streaming stdin
|
||||
|
||||
## Priority operations
|
||||
|
||||
+1
-1
@@ -9,7 +9,7 @@ A **kmer** is a DNA subsequence of fixed length k. Two constraints govern the ch
|
||||
|
||||
## Super-kmers
|
||||
|
||||
A **super-kmer** is a maximal run of consecutive kmers from a DNA read, each overlapping the next by k−1 nucleotides. Each kmer of the run carries the same **canonical minimizer**. The **canonical minimizer** of a kmer is the smallest value of `min(m-mer, revcomp(m-mer))` over all m-mers within the kmer (m < k, m odd), with the constraint that **non-degenerate m-mers are always preferred** over degenerate ones. A degenerate m-mer is one composed of a single repeated nucleotide (all-A, all-C, all-G, or all-T); such m-mers are selected only if no non-degenerate candidate exists in the window.
|
||||
A **super-kmer** is a maximal run of consecutive kmers from a DNA read, each overlapping the next by k−1 nucleotides, sharing the same **canonical minimizer**. The **canonical minimizer** of a kmer is the m-mer (m < k) whose canonical hash `hash_kmer(min(m-mer, revcomp(m-mer)))` is smallest over all m-mers in the kmer window. The hash function is a `mix64`-based bijection; selection is purely hash-ordered with no degeneracy filter. A super-kmer is capped at 256 nucleotides; a longer run is split at that boundary.
|
||||
|
||||
### Canonical super-kmers
|
||||
|
||||
|
||||
@@ -17,18 +17,22 @@ The Watson-Crick complement of any base is its bitwise NOT on 2 bits: `complemen
|
||||
|
||||
A kmer fits in a single `u64`. Nucleotide 0 occupies bits 63–62, nucleotide i occupies bits 63−2i and 62−2i, and the low 64−2k bits are zero. Extraction of nucleotide i (0 ≤ i < k): `(kmer >> (62 - 2*i)) & 0b11`.
|
||||
|
||||
Reverse complement is computed via a **16-bit lookup table** (65 536 entries × 2 bytes = 128 KB, fits in L2 cache) storing the reverse-complement of every 8-base chunk.
|
||||
Reverse complement is computed by **bit manipulation in four steps**, with no lookup table:
|
||||
|
||||
!!! abstract "Algorithm — Kmer reverse complement"
|
||||
```text
|
||||
procedure KmerRevcomp(kmer, k):
|
||||
raw ← TABLE16[kmer & 0xFFFF] << 48
|
||||
| TABLE16[(kmer >> 16) & 0xFFFF] << 32
|
||||
| TABLE16[(kmer >> 32) & 0xFFFF] << 16
|
||||
| TABLE16[(kmer >> 48) & 0xFFFF]
|
||||
return raw << (64 - 2*k)
|
||||
x ← ~kmer -- complement all bases
|
||||
x ← swap_bytes(x) -- reverse byte order
|
||||
x ← ((x >> 4) & 0x0F0F0F0F0F0F0F0F)
|
||||
| ((x & 0x0F0F0F0F0F0F0F0F) << 4) -- swap nibbles within each byte
|
||||
x ← ((x >> 2) & 0x3333333333333333)
|
||||
| ((x & 0x3333333333333333) << 2) -- swap 2-bit pairs within each nibble
|
||||
return x << (64 - 2*k) -- re-align to MSB
|
||||
```
|
||||
|
||||
The three reorder passes together reverse the order of all 2-bit base codes across the 64-bit word. The bitwise NOT in the first step complements each base (A↔T, C↔G). The final left shift clears the low 64−2k padding bits.
|
||||
|
||||
The **canonical form** is the lexicographic minimum of the kmer and its reverse complement:
|
||||
|
||||
```
|
||||
|
||||
@@ -40,7 +40,7 @@ The filter computes $\hat{H}(ws)$ for each word size ws from 1 to ws_max and ret
|
||||
|
||||
$$\text{entropy}(kmer) = \min_{ws=1}^{ws_{\max}} \hat{H}(ws)$$
|
||||
|
||||
A value near 0 indicates low complexity (e.g. AAAA…); near 1 indicates high complexity. A kmer is rejected if $\text{entropy}(kmer) \leq \theta$, where $\theta$ is a collection parameter. The minimum across word sizes ensures that any scale of repetition is detected independently: polyA is caught at ws=1, dinucleotide repeats at ws=2, etc.
|
||||
A value near 0 indicates low complexity (e.g. AAAA…); near 1 indicates high complexity. A kmer is rejected if $\text{entropy}(kmer) < \theta$, where $\theta$ is a collection parameter (default 0.7). The minimum across word sizes ensures that any scale of repetition is detected independently: polyA is caught at ws=1, dinucleotide repeats at ws=2, etc.
|
||||
|
||||
## Interpretation as an effective number of classes
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ impl KmerPartition {
|
||||
if !layer_dir.exists() { break; }
|
||||
l += 1;
|
||||
let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?;
|
||||
let reader = UnitigFileReader::open(&layer_dir.join("unitigs.bin"))?;
|
||||
let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?;
|
||||
|
||||
let counts_dir = layer_dir.join("counts");
|
||||
let presence_dir = layer_dir.join("presence");
|
||||
@@ -97,7 +97,7 @@ impl KmerPartition {
|
||||
let layer_dir = index_dir.join(format!("layer_{layer}"));
|
||||
if !layer_dir.exists() { break; }
|
||||
let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?;
|
||||
let reader = UnitigFileReader::open(&layer_dir.join("unitigs.bin"))?;
|
||||
let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?;
|
||||
|
||||
let counts_dir = layer_dir.join("counts");
|
||||
let presence_dir = layer_dir.join("presence");
|
||||
|
||||
@@ -110,7 +110,7 @@ impl KmerPartition {
|
||||
uw.close()?;
|
||||
|
||||
if with_counts {
|
||||
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, block_bits, |kmer| {
|
||||
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, block_bits, evidence, |kmer| {
|
||||
match (&mphf1_opt, &counts1_opt) {
|
||||
(Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())),
|
||||
_ => 1,
|
||||
@@ -118,13 +118,7 @@ impl KmerPartition {
|
||||
})
|
||||
.map_err(olm_to_sk)?;
|
||||
} else {
|
||||
Layer::<()>::build(&layer_dir, block_bits).map_err(olm_to_sk)?;
|
||||
}
|
||||
|
||||
// For approximate evidence: replace the exact evidence bundle with a
|
||||
// fingerprint. For exact evidence, build() already wrote it.
|
||||
if let EvidenceKind::Approx { b, z } = evidence {
|
||||
Layer::<()>::build_approx_evidence(&layer_dir, *b, *z).map_err(olm_to_sk)?;
|
||||
Layer::<()>::build(&layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
|
||||
}
|
||||
|
||||
// Write meta.json in the index/ directory so LayeredMap::open works
|
||||
|
||||
@@ -9,7 +9,7 @@ use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder,
|
||||
PersistentCompactIntVecBuilder};
|
||||
use obikseq::CanonicalKmer;
|
||||
use obiskio::{SKError, SKResult, UnitigFileReader};
|
||||
use obilayeredmap::{Layer, LayeredMap, MphfLayer, OLMError};
|
||||
use obilayeredmap::{EvidenceKind, Layer, LayeredMap, MphfLayer, OLMError};
|
||||
use obilayeredmap::meta::PartitionMeta;
|
||||
|
||||
use crate::partition::KmerPartition;
|
||||
@@ -195,7 +195,7 @@ impl KmerPartition {
|
||||
for l in 0..src_meta.n_layers {
|
||||
let unitigs_path = src_index_dir
|
||||
.join(format!("layer_{l}")).join("unitigs.bin");
|
||||
let reader = UnitigFileReader::open(&unitigs_path)?;
|
||||
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
|
||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||
if dst_map.query(kmer).is_none() {
|
||||
g.push(kmer);
|
||||
@@ -217,7 +217,7 @@ impl KmerPartition {
|
||||
uw.write(&unitig)?;
|
||||
}
|
||||
uw.close()?;
|
||||
Layer::<()>::build(&new_layer_dir, block_bits).map_err(olm_to_sk)?;
|
||||
Layer::<()>::build(&new_layer_dir, block_bits, &EvidenceKind::Exact).map_err(olm_to_sk)?;
|
||||
}
|
||||
drop(g);
|
||||
|
||||
@@ -303,7 +303,7 @@ impl KmerPartition {
|
||||
|
||||
for l in 0..src_meta.n_layers {
|
||||
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
|
||||
let reader = UnitigFileReader::open(&src_layer_dir.join("unitigs.bin"))?;
|
||||
let reader = UnitigFileReader::open_sequential(&src_layer_dir.join("unitigs.bin"))?;
|
||||
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
||||
|
||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||
|
||||
@@ -8,7 +8,7 @@ use obicompactvec::{PersistentBitMatrixBuilder,
|
||||
PersistentCompactIntVecBuilder};
|
||||
use obidebruinj::GraphDeBruijn;
|
||||
use obiskio::{SKError, SKResult, UnitigFileReader};
|
||||
use obilayeredmap::{Layer, MphfLayer, OLMError};
|
||||
use obilayeredmap::{EvidenceKind, Layer, MphfLayer, OLMError};
|
||||
use obilayeredmap::meta::PartitionMeta;
|
||||
|
||||
use crate::filter::KmerFilter;
|
||||
@@ -116,7 +116,7 @@ impl KmerPartition {
|
||||
let unitigs_path = src_layer_dir.join("unitigs.bin");
|
||||
if !unitigs_path.exists() { continue; }
|
||||
|
||||
let reader = UnitigFileReader::open(&unitigs_path)?;
|
||||
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
|
||||
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
||||
|
||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||
@@ -146,7 +146,7 @@ impl KmerPartition {
|
||||
uw.close()?;
|
||||
drop(g);
|
||||
|
||||
Layer::<()>::build(&dst_layer_dir, block_bits).map_err(olm_to_sk)?;
|
||||
Layer::<()>::build(&dst_layer_dir, block_bits, &EvidenceKind::Exact).map_err(olm_to_sk)?;
|
||||
let dst_mphf = MphfLayer::open(&dst_layer_dir).map_err(olm_to_sk)?;
|
||||
|
||||
// ── Prepare matrix builders (one column per genome) ───────────────────
|
||||
@@ -181,7 +181,7 @@ impl KmerPartition {
|
||||
let unitigs_path = src_layer_dir.join("unitigs.bin");
|
||||
if !unitigs_path.exists() { continue; }
|
||||
|
||||
let reader = UnitigFileReader::open(&unitigs_path)?;
|
||||
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
|
||||
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
||||
|
||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||
|
||||
@@ -102,8 +102,8 @@ impl<D: LayerData> Layer<D> {
|
||||
// ── Mode 1 — set membership ───────────────────────────────────────────────────
|
||||
|
||||
impl Layer<()> {
|
||||
pub fn build(out_dir: &Path, block_bits: u8) -> OLMResult<usize> {
|
||||
MphfLayer::build(out_dir, block_bits, &mut |_, _| Ok(()))
|
||||
pub fn build(out_dir: &Path, block_bits: u8, evidence_kind: &EvidenceKind) -> OLMResult<usize> {
|
||||
MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |_, _| Ok(()))
|
||||
}
|
||||
|
||||
/// Create a presence matrix for a set-membership layer (first merge).
|
||||
@@ -126,6 +126,7 @@ impl Layer<PersistentCompactIntMatrix> {
|
||||
pub fn build(
|
||||
out_dir: &Path,
|
||||
block_bits: u8,
|
||||
evidence_kind: &EvidenceKind,
|
||||
count_of: impl Fn(CanonicalKmer) -> u32,
|
||||
) -> OLMResult<usize> {
|
||||
let n = UnitigFileReader::open_sequential(&out_dir.join(UNITIGS_FILE))?.n_kmers();
|
||||
@@ -133,7 +134,7 @@ impl Layer<PersistentCompactIntMatrix> {
|
||||
let mut mb = PersistentCompactIntMatrixBuilder::new(n, &counts_dir)
|
||||
.map_err(OLMError::Io)?;
|
||||
let mut col = mb.add_col().map_err(OLMError::Io)?;
|
||||
let n_built = MphfLayer::build(out_dir, block_bits, &mut |slot, kmer| {
|
||||
let n_built = MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |slot, kmer| {
|
||||
col.set(slot, count_of(kmer));
|
||||
Ok(())
|
||||
})?;
|
||||
@@ -145,9 +146,10 @@ impl Layer<PersistentCompactIntMatrix> {
|
||||
pub fn build_from_map(
|
||||
out_dir: &Path,
|
||||
block_bits: u8,
|
||||
evidence_kind: &EvidenceKind,
|
||||
counts: &HashMap<CanonicalKmer, u32>,
|
||||
) -> OLMResult<usize> {
|
||||
Self::build(out_dir, block_bits, |kmer| counts.get(&kmer).copied().unwrap_or(0))
|
||||
Self::build(out_dir, block_bits, evidence_kind, |kmer| counts.get(&kmer).copied().unwrap_or(0))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,6 +179,7 @@ impl Layer<PersistentBitMatrix> {
|
||||
pub fn build_presence(
|
||||
out_dir: &Path,
|
||||
block_bits: u8,
|
||||
evidence_kind: &EvidenceKind,
|
||||
n_genomes: usize,
|
||||
present_in: impl Fn(CanonicalKmer, usize) -> bool,
|
||||
) -> OLMResult<usize> {
|
||||
@@ -186,7 +189,7 @@ impl Layer<PersistentBitMatrix> {
|
||||
let mut cols: Vec<_> = (0..n_genomes)
|
||||
.map(|_| mb.add_col().map_err(OLMError::Io))
|
||||
.collect::<OLMResult<_>>()?;
|
||||
let n_built = MphfLayer::build(out_dir, block_bits, &mut |slot, kmer| {
|
||||
let n_built = MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |slot, kmer| {
|
||||
for (g, col) in cols.iter_mut().enumerate() {
|
||||
col.set(slot, present_in(kmer, g));
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
|
||||
use obicompactvec::PersistentCompactIntMatrix;
|
||||
use obikseq::CanonicalKmer;
|
||||
use obiskio::{UnitigFileWriter, DEFAULT_BLOCK_BITS};
|
||||
use crate::meta::EvidenceKind;
|
||||
|
||||
use crate::error::OLMResult;
|
||||
use crate::layer::{Hit, Layer, LayerData};
|
||||
@@ -90,7 +91,7 @@ impl LayeredMap<()> {
|
||||
pub fn push_layer(&mut self) -> OLMResult<usize> {
|
||||
let i = self.layers.len();
|
||||
let dir = layer_dir(&self.root, i);
|
||||
Layer::<()>::build(&dir, DEFAULT_BLOCK_BITS)?;
|
||||
Layer::<()>::build(&dir, DEFAULT_BLOCK_BITS, &EvidenceKind::Exact)?;
|
||||
self.append_layer()?;
|
||||
Ok(i)
|
||||
}
|
||||
@@ -102,7 +103,7 @@ impl LayeredMap<PersistentCompactIntMatrix> {
|
||||
pub fn push_layer(&mut self, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
|
||||
let i = self.layers.len();
|
||||
let dir = layer_dir(&self.root, i);
|
||||
Layer::<PersistentCompactIntMatrix>::build(&dir, DEFAULT_BLOCK_BITS, count_of)?;
|
||||
Layer::<PersistentCompactIntMatrix>::build(&dir, DEFAULT_BLOCK_BITS, &EvidenceKind::Exact, count_of)?;
|
||||
self.append_layer()?;
|
||||
Ok(i)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::path::Path;
|
||||
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
||||
use epserde::prelude::*;
|
||||
use obikseq::CanonicalKmer;
|
||||
use obiskio::{UnitigFileReader, UnitigFileWriter, build_unitig_idx};
|
||||
use obiskio::{CanonicalKmerIter, UnitigFileReader, UnitigFileWriter, build_unitig_idx};
|
||||
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
|
||||
|
||||
use crate::error::{OLMError, OLMResult};
|
||||
@@ -198,22 +198,35 @@ impl MphfLayer {
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
/// Build MPHF and exact evidence from the unitigs file already present in
|
||||
/// `dir`. Calls `fill_slot(slot, kmer)` once per kmer for DataStore
|
||||
/// population. Returns the number of kmers indexed.
|
||||
/// Build MPHF then evidence from the unitigs file already present in `dir`.
|
||||
///
|
||||
/// - Exact: `.idx` is built for pass-1 parallel construction and kept for
|
||||
/// query-time kmer verification. `evidence.bin` is written.
|
||||
/// - Approx: pass-1 uses `open_sequential` + `par_bridge` — no `.idx` is
|
||||
/// ever created. `fingerprint.bin` is written.
|
||||
///
|
||||
/// `fill_slot(slot, kmer)` is called once per kmer in both modes.
|
||||
pub(crate) fn build(
|
||||
dir: &Path,
|
||||
block_bits: u8,
|
||||
evidence_kind: &EvidenceKind,
|
||||
fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>,
|
||||
) -> OLMResult<usize> {
|
||||
use rayon::prelude::*;
|
||||
|
||||
let unitig_path = dir.join(UNITIGS_FILE);
|
||||
|
||||
build_unitig_idx(&unitig_path, block_bits)?;
|
||||
|
||||
let unitigs = UnitigFileReader::open(&unitig_path)?;
|
||||
let n = unitigs.n_kmers();
|
||||
match evidence_kind {
|
||||
// ── Exact path ────────────────────────────────────────────────────
|
||||
// .idx is built LAST, once evidence.bin is written, so it is never
|
||||
// present during construction — only at query time.
|
||||
EvidenceKind::Exact => {
|
||||
let n = UnitigFileReader::open_sequential(&unitig_path)?.n_kmers();
|
||||
let keys = CanonicalKmerIter::new(&unitig_path)
|
||||
.map_err(|e| match e {
|
||||
obiskio::SKError::Io(io) => OLMError::Io(io),
|
||||
e => OLMError::InvalidLayer(e.to_string()),
|
||||
})?;
|
||||
|
||||
if n == 0 {
|
||||
fs::File::create(dir.join(EVIDENCE_FILE))?;
|
||||
@@ -223,20 +236,18 @@ impl MphfLayer {
|
||||
mphf.store(&dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
LayerMeta::exact().save(dir)?;
|
||||
build_unitig_idx(&unitig_path, block_bits)?;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Pass 1 — build MPHF (parallel, random access via .idx)
|
||||
let keys = (0..unitigs.len())
|
||||
.into_par_iter()
|
||||
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
|
||||
// Pass 1 — MPHF construction via clonable mmap iterator
|
||||
let mphf: Mphf =
|
||||
Mphf::new_from_par_iter(n, keys, PtrHashParams::<CubicEps>::default());
|
||||
Mphf::new_from_par_iter(n, keys.map(|k| k.raw()).par_bridge(), PtrHashParams::<CubicEps>::default());
|
||||
mphf.store(&dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
|
||||
// Pass 2 — fill evidence + mode-specific data via callback
|
||||
let unitigs2 = UnitigFileReader::open(&unitig_path)?;
|
||||
// Pass 2 — sequential: fill evidence.bin + callback
|
||||
let unitigs2 = UnitigFileReader::open_sequential(&unitig_path)?;
|
||||
let mut ev = EvidenceWriter::new(n);
|
||||
let mut seen = vec![0u8; (n + 7) / 8];
|
||||
|
||||
@@ -257,6 +268,65 @@ impl MphfLayer {
|
||||
|
||||
ev.write(&dir.join(EVIDENCE_FILE))?;
|
||||
LayerMeta::exact().save(dir)?;
|
||||
// .idx built last: strictly for query-time kmer verification
|
||||
build_unitig_idx(&unitig_path, block_bits)?;
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
// ── Approx path ───────────────────────────────────────────────────
|
||||
// No .idx is created at any point.
|
||||
EvidenceKind::Approx { b, z } => {
|
||||
let unitigs = UnitigFileReader::open_sequential(&unitig_path)?;
|
||||
let n = unitigs.n_kmers();
|
||||
|
||||
if n == 0 {
|
||||
FingerprintVecWriter::new(0, *b).write(&dir.join(FINGERPRINT_FILE))?;
|
||||
let mphf: Mphf =
|
||||
Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
|
||||
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
|
||||
mphf.store(&dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
LayerMeta::approx(*b, *z).save(dir)?;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Pass 1 — MPHF construction via mmap-backed clonable iterator.
|
||||
// No .idx is created. par_bridge() parallelises the sequential scan;
|
||||
// Clone on CanonicalKmerRawIter shares the Arc<Mmap> and resets to pos 0.
|
||||
let keys = CanonicalKmerIter::new(&unitig_path)
|
||||
.map_err(|e| match e {
|
||||
obiskio::SKError::Io(io) => OLMError::Io(io),
|
||||
e => OLMError::InvalidLayer(e.to_string()),
|
||||
})?;
|
||||
let mphf: Mphf =
|
||||
Mphf::new_from_par_iter(n, keys.map(|k| k.raw()).par_bridge(), PtrHashParams::<CubicEps>::default());
|
||||
mphf.store(&dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
|
||||
// Pass 2 — sequential: fill fingerprint.bin + callback
|
||||
let unitigs2 = UnitigFileReader::open_sequential(&unitig_path)?;
|
||||
let mut fw = FingerprintVecWriter::new(n, *b);
|
||||
let mut seen = vec![0u8; (n + 7) / 8];
|
||||
|
||||
for kmer in unitigs2.iter_canonical_kmers() {
|
||||
let slot = mphf.index(&kmer.raw());
|
||||
if slot >= n {
|
||||
return Err(OLMError::Mphf("slot out of bounds".into()));
|
||||
}
|
||||
let byte = slot / 8;
|
||||
let bit = 1u8 << (slot % 8);
|
||||
if seen[byte] & bit != 0 {
|
||||
return Err(OLMError::Mphf("duplicate slot".into()));
|
||||
}
|
||||
seen[byte] |= bit;
|
||||
fw.set(slot, kmer.seq_hash());
|
||||
fill_slot(slot, kmer)?;
|
||||
}
|
||||
|
||||
fw.write(&dir.join(FINGERPRINT_FILE))?;
|
||||
LayerMeta::approx(*b, *z).save(dir)?;
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ use super::*;
|
||||
use obicompactvec::PersistentCompactIntMatrix;
|
||||
use obikseq::{set_k, Kmer, Sequence as _, Unitig};
|
||||
use obiskio::DEFAULT_BLOCK_BITS;
|
||||
use crate::meta::EvidenceKind;
|
||||
use tempfile::tempdir;
|
||||
|
||||
fn write_unitigs(dir: &Path, seqs: &[&[u8]]) {
|
||||
@@ -24,7 +25,7 @@ fn build_and_query_all_kmers_found() {
|
||||
let dir = tempdir().unwrap();
|
||||
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
||||
let kmers = all_canonical_kmers(dir.path());
|
||||
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS).unwrap();
|
||||
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact).unwrap();
|
||||
let layer = Layer::<()>::open(dir.path()).unwrap();
|
||||
for kmer in kmers {
|
||||
assert!(layer.query(kmer).is_some(), "kmer should be present");
|
||||
@@ -42,6 +43,7 @@ fn counts_are_stored_and_retrieved() {
|
||||
Layer::<PersistentCompactIntMatrix>::build(
|
||||
dir.path(),
|
||||
DEFAULT_BLOCK_BITS,
|
||||
&EvidenceKind::Exact,
|
||||
|kmer| count_map.get(&kmer).copied().unwrap_or(0),
|
||||
).unwrap();
|
||||
let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap();
|
||||
@@ -56,7 +58,7 @@ fn query_absent_returns_none() {
|
||||
set_k(4);
|
||||
let dir = tempdir().unwrap();
|
||||
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
||||
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS).unwrap();
|
||||
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact).unwrap();
|
||||
let layer = Layer::<()>::open(dir.path()).unwrap();
|
||||
let absent = Kmer::from_ascii(b"CCCC").unwrap().canonical();
|
||||
assert!(layer.query(absent).is_none());
|
||||
@@ -67,7 +69,7 @@ fn open_after_build_is_consistent() {
|
||||
set_k(4);
|
||||
let dir = tempdir().unwrap();
|
||||
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
||||
let n = Layer::<PersistentCompactIntMatrix>::build(dir.path(), DEFAULT_BLOCK_BITS, |_| 7).unwrap();
|
||||
let n = Layer::<PersistentCompactIntMatrix>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact, |_| 7).unwrap();
|
||||
assert_eq!(n, 4);
|
||||
let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap();
|
||||
let kmer = Kmer::from_ascii(b"AAAA").unwrap().canonical();
|
||||
|
||||
@@ -8,7 +8,8 @@ pub use error::{SKError, SKResult};
|
||||
pub use meta::SKFileMeta;
|
||||
pub use pool::{SKFilePool, SKFileWriter, SharedPool, create_token, create_token_with};
|
||||
pub use reader::{SKFileIter, SKFileReader};
|
||||
pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS};
|
||||
pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS,
|
||||
CanonicalKmerIter};
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Write as _};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use memmap2::Mmap;
|
||||
use obikseq::{CanonicalKmer, Kmer, Unitig};
|
||||
@@ -439,6 +440,85 @@ fn extract_kmer_raw(bytes: &[u8], j: usize, k: usize) -> u64 {
|
||||
raw << (64 - 2 * k)
|
||||
}
|
||||
|
||||
// ── CanonicalKmerRawIter ──────────────────────────────────────────────────────
|
||||
|
||||
// ── CanonicalKmerIter ─────────────────────────────────────────────────────────
|
||||
|
||||
/// Sequential iterator over [`CanonicalKmer`] from a `unitigs.bin` file.
|
||||
///
|
||||
/// Holds an `Arc<Mmap>` so that `Clone` is O(1): both copies share the same
|
||||
/// memory-mapped pages. Cloning resets the cursor to position 0 — this lets
|
||||
/// ptr_hash's `new_from_par_iter` (which requires a `Clone`-able parallel
|
||||
/// iterator via `par_bridge()`) make multiple passes without ever creating
|
||||
/// a `.idx` file.
|
||||
pub struct CanonicalKmerIter {
|
||||
mmap: Arc<Mmap>,
|
||||
k: usize,
|
||||
chunk_pos: usize, // byte offset of the current chunk header
|
||||
data_pos: usize, // byte offset of the current chunk's sequence bytes
|
||||
n_kmers: usize, // kmers in current chunk
|
||||
kmer_idx: usize, // next kmer index to yield within the current chunk
|
||||
}
|
||||
|
||||
impl CanonicalKmerIter {
|
||||
pub fn new(path: &Path) -> SKResult<Self> {
|
||||
let file = File::open(path).map_err(SKError::Io)?;
|
||||
let mmap = Arc::new(unsafe { Mmap::map(&file).map_err(SKError::Io)? });
|
||||
let k = obikseq::params::k();
|
||||
let mut s = Self { mmap, k, chunk_pos: 0, data_pos: 0, n_kmers: 0, kmer_idx: 0 };
|
||||
s.load_chunk();
|
||||
Ok(s)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn load_chunk(&mut self) {
|
||||
if self.chunk_pos < self.mmap.len() {
|
||||
let seql_minus_k = self.mmap[self.chunk_pos] as usize;
|
||||
self.n_kmers = seql_minus_k + 1;
|
||||
self.data_pos = self.chunk_pos + 1;
|
||||
self.kmer_idx = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for CanonicalKmerIter {
|
||||
fn clone(&self) -> Self {
|
||||
let mut c = Self {
|
||||
mmap: Arc::clone(&self.mmap),
|
||||
k: self.k,
|
||||
chunk_pos: 0,
|
||||
data_pos: 0,
|
||||
n_kmers: 0,
|
||||
kmer_idx: 0,
|
||||
};
|
||||
c.load_chunk();
|
||||
c
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for CanonicalKmerIter {
|
||||
type Item = CanonicalKmer;
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> Option<CanonicalKmer> {
|
||||
loop {
|
||||
if self.chunk_pos >= self.mmap.len() {
|
||||
return None;
|
||||
}
|
||||
if self.kmer_idx < self.n_kmers {
|
||||
let raw = extract_kmer_raw(&self.mmap[self.data_pos..], self.kmer_idx, self.k);
|
||||
let canon = canonical_raw(raw, self.k);
|
||||
self.kmer_idx += 1;
|
||||
return Some(CanonicalKmer::from_raw_unchecked(canon));
|
||||
}
|
||||
let seql_minus_k = self.mmap[self.chunk_pos] as usize;
|
||||
let byte_len = (seql_minus_k + self.k + 3) / 4;
|
||||
self.chunk_pos += 1 + byte_len;
|
||||
self.load_chunk();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "tests/unitig_index.rs"]
|
||||
mod tests;
|
||||
|
||||
Reference in New Issue
Block a user