Merge pull request 'Push vqmnuyrkpxot' (#9) from push-vqmnuyrkpxot into main

Reviewed-on: #9
This commit was merged in pull request #9.
This commit is contained in:
2026-05-26 09:05:50 +00:00
24 changed files with 716 additions and 290 deletions
+3
View File
@@ -8,6 +8,9 @@ Ne modifier aucun fichier à moins d'une demande explicite de modification. En p
**Règle absolue : ne jamais substituer une dépendance ou une bibliothèque sans validation explicite.** **Règle absolue : ne jamais substituer une dépendance ou une bibliothèque sans validation explicite.**
Si une dépendance demandée pose problème (erreur de compilation, bug, API manquante), exposer le problème et proposer des alternatives — ne jamais switcher silencieusement vers une autre bibliothèque. Le choix des dépendances est une décision d'architecture qui appartient au développeur. Si une dépendance demandée pose problème (erreur de compilation, bug, API manquante), exposer le problème et proposer des alternatives — ne jamais switcher silencieusement vers une autre bibliothèque. Le choix des dépendances est une décision d'architecture qui appartient au développeur.
**Règle absolue : le code existant est une hypothèse, pas une vérité.**
Quand une nouvelle construction (type, itérateur, abstraction) rend du code historique injustifié, le signaler immédiatement et proposer de le supprimer — ne pas conserver les deux en parallèle par inertie. Le développeur demande explicitement de remettre en cause le code base : ne pas attendre qu'il insiste.
Tu maintiens en **anglais**, dense et sans remplissage, les documents suivants : Tu maintiens en **anglais**, dense et sans remplissage, les documents suivants :
- `docmd/index.md` — document de discussion de base, enrichi progressivement au fil de nos échanges ; il reflète l'état courant de la réflexion sur le projet - `docmd/index.md` — document de discussion de base, enrichi progressivement au fil de nos échanges ; il reflète l'état courant de la réflexion sur le projet
- les autres fichiers Markdown dans `docmd/` selon leur thème respectif - les autres fichiers Markdown dans `docmd/` selon leur thème respectif
+81 -48
View File
@@ -19,34 +19,80 @@ Given a set of query sequences, determine for each sequence how many of its k-me
The query follows the same superkmer-based partitioning strategy used at indexing time. The query follows the same superkmer-based partitioning strategy used at indexing time.
``` ```
for each query sequence: for each batch of sequences:
decompose into superkmers (non-ACGT breaks, same minimiser scheme as indexing) build QueryBatch: decompose all sequences into superkmers, deduplicate
for each superkmer: split superkmers by partition via minimiser hash
route to partition p via minimiser hash for each partition p:
for each kmer in the superkmer: query_partition(p, superkmers_routed_to_p)
lookup kmer in partition p (MPHF → evidence check → matrix row) → load QueryLayer(s) for p
accumulate result into per-sequence accumulators → for each kmer in each superkmer: MphfLayer::find(kmer)
emit annotated sequence broadcast results back to each (seq_idx, kmer_offset) that referenced the superkmer
emit annotated sequences
``` ```
Parallelism is **per sequence**: each worker thread handles all partitions of one sequence independently. This avoids cross-thread coordination when merging partial results and keeps memory usage proportional to the number of concurrent sequences rather than to the number of partitions. Superkmers that appear more than once in the batch (same sequence or across sequences) are deduplicated: each unique `RoutableSuperKmer` is queried once per partition, and the result is broadcast to every `SKDesc` entry that references it.
Parallelism is **not yet active** in the current implementation: batches are processed sequentially on a single thread despite the `--threads` flag being parsed. The `QueryBatch` / `split_by_partition` design is structured to support per-partition parallelism in a future iteration.
--- ---
## Exact vs. approximate matching ## Layer lookup: `MphfLayer::find`
### Exact (default) `MphfLayer::open` reads `layer_meta.json` and loads either exact or approximate evidence. The caller (`QueryLayer::find`) never chooses the dispatch path — it is fixed at open time by `LayerEvidence`:
Standard MPHF lookup followed by evidence check. O(1) per k-mer. ```rust
pub fn find(&self, kmer: CanonicalKmer) -> Option<usize> {
match &self.ev {
LayerEvidence::Exact { .. } => self.find_exact(kmer),
LayerEvidence::Approx { .. } => self.find_approx(kmer),
}
}
```
### 1-mismatch (`--mismatch` flag) ### Exact layers
For each k-mer of the query, generate all `3·k` single-substitution variants. Each variant is canonicalised and looked up independently in the index. If one or more variants are found, their per-genome rows are **summed** into the result for that k-mer position. `find_exact` maps the k-mer through the MPHF to a slot, then calls `UnitigFileReader::verify_canonical_kmer(chunk_id, rank, kmer)` to confirm the stored k-mer matches. Zero false positives. Requires `UnitigFileReader::open()` (random-access via `.idx`); `open_sequential()` cannot serve random-access verification.
- If a k-mer matches exactly AND one of its variants also matches (distinct k-mers in the index), both contributions are accumulated. ### Approximate layers
- Exact and approximate matches are tracked separately in the output (see annotation schema below).
- The superkmer routing optimisation is **not** used in 1-mismatch mode: each variant is looked up directly via its own minimiser. `find_approx` maps the k-mer through the MPHF, then checks a stored `b`-bit fingerprint of the canonical hash. False-positive rate: **1/2^b per k-mer query**. No `.idx` file is needed; the layer carries only `fingerprint.bin`.
- Cost: up to `3·k` MPHF probes per k-mer position vs. 1 in exact mode.
For a query window of z consecutive k-mers (Findere scheme), the false-positive rate per window is **1/2^(b·z)**. The `z` parameter is recorded in `layer_meta.json` at build time but is not enforced during querying — the caller is responsible for interpreting window-level results accordingly.
### `QueryLayer` variant selection
`QueryLayer::open` in `query_layer.rs` selects the data matrix to pair with `MphfLayer`:
| Condition | Variant | Data returned per k-mer |
|---|---|---|
| `with_counts=true` and `counts/` exists | `Count` | raw count per genome |
| `presence/` exists | `Presence` | 0/1 per genome (bit matrix) |
| only `counts/` exists | `Count` | counts used as-is |
| neither exists | `SetOnly` | 1 for every genome |
---
## `open()` vs `open_sequential()`
`UnitigFileReader::open()` loads the `.idx` block-offset table, enabling random access to individual unitig chunks. It is required whenever `verify_canonical_kmer` is called (exact layers at query time).
`UnitigFileReader::open_sequential()` skips the `.idx` and supports only forward iteration. It is sufficient for:
- build passes that scan all unitigs sequentially (`build_exact_evidence`, `build_approx_evidence`);
- the `unitig` subcommand, which iterates and prints unitigs without random access.
`KmerIndex::open()` (called by `query::run`) triggers `MphfLayer::open` for each layer, which calls `UnitigFileReader::open()` for exact layers. Approximate layers do not open a unitig reader at all.
---
## Presence / count mode at query time
The `--force-presence` flag and `--presence-threshold` control how per-genome values are accumulated, independently of what the index stores:
```
genome_totals[g] += if presence { u32::from(v >= threshold) } else { v }
```
`presence` is true when `--force-presence` is set or when the index has no counts (`!with_counts`). The default `presence_threshold` is 1, so any nonzero count counts as a match.
--- ---
@@ -55,57 +101,44 @@ For each k-mer of the query, generate all `3·k` single-substitution variants. E
Output sequences are written in **OBITools4 format**: the original sequence with a JSON annotation map in the title line. Output sequences are written in **OBITools4 format**: the original sequence with a JSON annotation map in the title line.
``` ```
>read_id {"kmer_total":150,"kmer_found":59,...} >read_id {"kmer_count":59,"kmer_strict_matches":{"genome_a":42,"genome_b":7,...}}
ATCGATCG... ATCGATCG...
``` ```
Genome order in all list-valued annotations follows the genome order recorded in `index.meta`. Genome keys in `kmer_strict_matches` are genome labels from `index.meta`. Key order follows iteration order of `meta.genomes`.
--- ---
## Annotation schema ## Annotation schema (current implementation)
### Summary mode (default)
| Key | Type | Condition | Semantics | | Key | Type | Condition | Semantics |
|---|---|---|---| |---|---|---|---|
| `kmer_total` | int | always | total k-mers in the (masked) sequence | | `kmer_count` | int | always | k-mers with at least one match |
| `kmer_found` | int | always | k-mers with at least one match (exact or approx) | | `kmer_missing` | int | `--count-missing` | k-mers absent from every layer |
| `kmer_missing` | int | `--count-missing` | k-mers absent from the index | | `kmer_strict_matches` | object | always | per-genome accumulated value (label → count or 0/1) |
| `kmer_match` | list[int] | always | per-genome matched k-mer count (exact + approx) |
| `kmer_match_exact` | list[int] | `--mismatch` | per-genome exact match count |
| `kmer_match_approx` | list[int] | `--mismatch` | per-genome approx match count |
| `count_match` | list[int] | count index | per-genome sum of index counts for matched k-mers |
`kmer_match[i]` is the number of k-mer positions in the query that contribute at least one match to genome i. In 1-mismatch mode, a single k-mer position can contribute to multiple genomes if several of its variants are present in the index. `kmer_count` counts matched k-mer positions (incremented once per `Some(row)` hit regardless of how many genomes are covered). `kmer_missing` counts `None` hits.
`count_match[i]` sums raw index counts across all matched k-mer positions for genome i. Only meaningful for count indexes. **Note on doc/impl divergence:** the doc previously used keys `kmer_total`, `kmer_found`, and `kmer_match` (list). The implementation uses `kmer_count` (int, matched only) and `kmer_strict_matches` (object keyed by genome label). `--mismatch` and `--detail` are parsed but not yet implemented and emit a warning.
### Detail mode (`--detail`)
All summary keys, plus per-position coverage vectors — one list per genome, length `len(sequence) k + 1`:
| Key | Type | Condition | Semantics |
|---|---|---|---|
| `cov_<i>` | list[int] | `--detail` | coverage at each k-mer position for genome i; raw count (count index) or 0/1 (presence index); 0 if absent |
| `cov_exact_<i>` | list[int] | `--detail` + `--mismatch` | exact-match contribution per position |
| `cov_approx_<i>` | list[int] | `--detail` + `--mismatch` | approx-match contribution per position |
Genome indices in key names are 0-based integers matching the `index.meta` genome order. Genome labels are not used as key names to avoid issues with special characters in long or complex genome identifiers.
--- ---
## CLI ## CLI
``` ```
obikmer query -i <index> [--summary | --detail] [--mismatch] [--count-missing] <query.fa> obikmer query -i <index> [--detail] [--mismatch] [--count-missing]
[--force-presence] [--presence-threshold <n>]
[-T <threads>] <query.fa> [<query2.fa> ...]
``` ```
`--summary` is the default; `--detail` implies `--summary` (all summary keys are always present). `--mismatch` and `--detail` are accepted but currently ignored with a warning on stderr.
--- ---
## Future work ## Future work
- **Read classification** (`--classify`): assign each read to the genome with the highest `kmer_match` score; emit as a single annotation key. - **`--mismatch`**: 1-mismatch approximate matching — generate `3·k` single-substitution variants per k-mer, look each up independently.
- **Whitelist / blacklist filtering**: accept or reject sequences based on whether their k-mer match score for a designated set of genomes exceeds a threshold. - **`--detail`**: per-position coverage vectors (`cov_<i>`) per genome.
- **Read classification** (`--classify`): assign each read to the genome with the highest match score.
- **Parallelism**: activate per-partition or per-sequence worker threads using the already-parsed `--threads` value.
- **Whitelist / blacklist filtering**: threshold-based accept/reject on per-genome match scores.
+17 -26
View File
@@ -2,19 +2,11 @@
The `obiread` crate provides a streaming iterator that reads FASTA or FASTQ files in fixed-size blocks and yields self-contained chunks, each ending on a complete sequence record boundary. Chunks are consumed in parallel by downstream workers. The `obiread` crate provides a streaming iterator that reads FASTA or FASTQ files in fixed-size blocks and yields self-contained chunks, each ending on a complete sequence record boundary. Chunks are consumed in parallel by downstream workers.
## Output type: rope ## Output type: Rope
Each chunk is a `Vec<Bytes>` — a **rope**: a list of reference-counted byte slices that are not necessarily contiguous in memory. The consumer iterates over the slices in order. Each chunk is a `Rope` — a segmented byte sequence: a `Vec` of blocks, where each block is a `Vec<Cell<u8>>`. The consumer iterates over the blocks via a forward or backward cursor.
Using `bytes::Bytes` means the split at the record boundary is O(1): `Bytes::split_to(n)` adjusts a reference counter, not memory. No `memcpy` in the common case. `Rope::split_off(pos)` splits at an absolute byte offset in O(log n) (binary search over block-start index). If `pos` falls inside a block, that block is split in two via `Vec::split_off` — no `memcpy` in the common case.
## Allocation policy
| Case | Cost |
|------|------|
| Boundary found in the current block (common) | zero extra allocation — `split_to` only |
| Boundary straddles multiple blocks (sequence > block size, rare) | one allocation to pack the rope into a flat buffer |
| EOF flush | zero extra allocation |
## SeqChunkIter ## SeqChunkIter
@@ -22,7 +14,7 @@ Using `bytes::Bytes` means the split at the record boundary is O(1): `Bytes::spl
pub struct SeqChunkIter<R: Read> { /* private */ } pub struct SeqChunkIter<R: Read> { /* private */ }
impl<R: Read> Iterator for SeqChunkIter<R> { impl<R: Read> Iterator for SeqChunkIter<R> {
type Item = io::Result<Vec<Bytes>>; type Item = io::Result<Rope>;
} }
pub fn fasta_chunks<R: Read>(source: R) -> SeqChunkIter<R> pub fn fasta_chunks<R: Read>(source: R) -> SeqChunkIter<R>
@@ -32,22 +24,21 @@ pub fn fastq_chunks<R: Read>(source: R) -> SeqChunkIter<R>
`next()` loop: `next()` loop:
```text ```text
1. read one block of block_size bytes → push onto rope 1. read one block of block_size bytes → push onto Rope
2. probe check: if the boundary marker ("\n>" or "\n@") is absent from the 2. call splitter(rope) → Option<abs_offset>
last block, skip the splitter (avoids a full backward scan for nothing) if Some(pos):
3. call splitter on last block tail = rope.split_off(pos) ← O(log n), may split one block
if found at offset n: chunk = mem::replace(&mut rope, tail)
remainder = last_block.split_to(n) ← O(1), zero copy return Some(Ok(chunk))
return std::mem::take(&mut self.rope) ← the chunk 3. if EOF and rope non-empty: return Some(Ok(rope)) as final chunk
4. if rope.len() > 1 (multi-block accumulation): 4. if EOF and rope empty: return None
pack rope into one flat buffer ← one alloc
retry splitter on flat buffer
5. if EOF: flush remaining rope as final chunk
``` ```
The `Splitter` function signature is `fn(&Rope) -> Option<usize>`. It returns the absolute byte offset of the start of the last complete record, or `None` if no boundary was found in the accumulated rope (need more data).
## Boundary detection — FASTA ## Boundary detection — FASTA
Backward scan with a 2-state machine. Searches for `>` immediately preceded by `\n` or `\r`: Backward scan with a 2-state machine. Searches (right to left) for `>` followed by `\n` or `\r` (i.e., a `>` that is preceded by a newline in forward order):
```mermaid ```mermaid
stateDiagram-v2 stateDiagram-v2
@@ -58,13 +49,13 @@ stateDiagram-v2
FoundGt --> [*] : '\\n' / '\\r' ✓ FoundGt --> [*] : '\\n' / '\\r' ✓
``` ```
Returns the byte offset of the `>` that starts the last complete record. Returns the byte offset of the `>` that starts the last complete record. Returns `None` if only one `>` is found (cannot confirm there is a prior complete record).
## Boundary detection — FASTQ ## Boundary detection — FASTQ
FASTQ records have a rigid 4-line structure (`@header`, sequence, `+`, quality). The `@` character (ASCII 64, Phred score 31) can appear legitimately in quality lines, making any forward heuristic unreliable. The backward scanner verifies the full structural context before accepting a candidate `@`. FASTQ records have a rigid 4-line structure (`@header`, sequence, `+`, quality). The `@` character (ASCII 64, Phred score 31) can appear legitimately in quality lines, making any forward heuristic unreliable. The backward scanner verifies the full structural context before accepting a candidate `@`.
7-state machine (port of Go's `EndOfLastFastqEntry`), scanning from **right to left**. Each time a `+` is found, its position is saved as `restart`; any state mismatch resets the scan to that position. 7-state machine (states 06), scanning from **right to left**. Each time a `+` is found, its position is saved as `restart`; any state mismatch resets the scan to that position.
```mermaid ```mermaid
stateDiagram-v2 stateDiagram-v2
+44 -13
View File
@@ -1,38 +1,57 @@
# Kmer — implementation # Kmer — implementation
## Memory layout ## Types and layout
`Kmer` is a `#[repr(transparent)]` newtype over `u64`: `KmerOf<L>` is a `#[repr(transparent)]` newtype over `u64` parameterized by a `KmerLength` marker:
```rust ```rust
#[repr(transparent)] #[repr(transparent)]
pub struct Kmer(u64); pub struct KmerOf<L: KmerLength>(u64, PhantomData<L>);
``` ```
Nucleotides are packed 2 bits each, **left-aligned**, MSB-first. Nucleotide 0 occupies bits 6362; nucleotide i occupies bits 632i and 622i. The low 642k bits are always zero. k is **not stored** — it is a parameter of every operation that needs it, and will be owned by the future collection-level indexer. Three marker types implement `KmerLength`:
| Marker | `len()` source | Used for |
|--------|---------------|---------|
| `KLen` | `params::k()` | k-mers |
| `MLen` | `params::m()` | minimizers |
| `ConstLen<N>` | const generic `N` | tests |
Public aliases:
```rust
pub type Kmer = KmerOf<KLen>; // k-mer, global k
pub type Minimizer = CanonicalKmerOf<MLen>; // canonical m-mer, global m
```
Nucleotides are packed 2 bits each, **left-aligned**, MSB-first. Nucleotide 0 occupies bits 6362; nucleotide i occupies bits 632i and 622i. The low 642·len bits are always zero. The length is **not stored** — every operation reads it from `L::len()`.
| 6362 | 6160 | … | 632(k1)1 to 632(k1) | 632k down to 0 | | 6362 | 6160 | … | 632(k1)1 to 632(k1) | 632k down to 0 |
|-------|-------|---|--------------------------|-----------------| |-------|-------|---|--------------------------|-----------------|
| nt 0 | nt 1 | … | nt k1 | zero padding | | nt 0 | nt 1 | … | nt k1 | zero padding |
## Global parameters
`params::set_k(k)` / `params::k()` and `params::set_m(m)` / `params::m()` are backed by `OnceLock<usize>` in production (write-once, panic on conflict) and by `thread_local! { Cell<usize> }` in test builds (per-thread, freely writable). `params::init(k, m)` sets both in one call.
## Encoding ## Encoding
`Kmer::from_ascii(ascii, k)` encodes the first k bytes of an ASCII slice using the shared `ENC` table (see [SuperKmer — ASCII encoding](superkmer.md#ascii-encoding-and-decoding)): `KmerOf::<L>::from_ascii(ascii)` encodes the first `L::len()` bytes using the shared `ENC` table (see [SuperKmer — ASCII encoding](superkmer.md#ascii-encoding-and-decoding)):
```rust ```rust
for i in 0..k { for i in 0..k {
val = (val << 2) | encode_base(ascii[i]) as u64; val = (val << 2) | encode_base(ascii[i]) as u64;
} }
Kmer(val << (64 - 2 * k)) KmerOf(val << (64 - 2 * k), PhantomData)
``` ```
Zero allocation — result lives on the stack. Zero allocation — result lives on the stack.
## Decoding ## Decoding
`write_ascii(k, buf)` appends k ASCII characters to a caller-supplied `Vec<u8>` using the shared `DEC4` table: one lookup per 4 nucleotides, two partial-byte lookups for the remainder. No allocation in the hot path. `write_ascii(writer)` writes k ASCII characters to any `W: Write` using the shared `DEC4` table: one lookup per 4 nucleotides, one partial lookup for the remainder. No allocation in the hot path.
`to_ascii(k)` is a convenience wrapper that allocates and returns a `Vec<u8>`; intended for tests and display only. `to_ascii()` is a convenience wrapper that allocates and returns a `Vec<u8>`; intended for tests and display only.
## Reverse complement ## Reverse complement
@@ -43,18 +62,30 @@ let x = !self.0; /
let x = x.swap_bytes(); // reverse bytes let x = x.swap_bytes(); // reverse bytes
let x = ((x >> 4) & 0x0F0F0F0F0F0F0F0F) | ((x & 0x0F0F0F0F0F0F0F0F) << 4); // swap nibbles let x = ((x >> 4) & 0x0F0F0F0F0F0F0F0F) | ((x & 0x0F0F0F0F0F0F0F0F) << 4); // swap nibbles
let x = ((x >> 2) & 0x3333333333333333) | ((x & 0x3333333333333333) << 2); // swap 2-bit groups let x = ((x >> 2) & 0x3333333333333333) | ((x & 0x3333333333333333) << 2); // swap 2-bit groups
Kmer(x << (64 - 2 * k)) KmerOf(x << (64 - 2 * k), PhantomData)
``` ```
After complementing, bytes are reversed (`swap_bytes`), then nibbles, then 2-bit groups — restoring 2-bit nucleotides to their correct positions in reverse order. A final left-shift realigns to MSB. Zero allocation — result lives on the stack. After complementing, bytes are reversed (`swap_bytes`), then nibbles, then 2-bit groups — restoring 2-bit nucleotides to their correct positions in reverse order. A final left-shift realigns to MSB. Zero allocation — result lives on the stack.
## Canonical form ## Canonical form and `CanonicalKmerOf`
`canonical()` returns a `CanonicalKmerOf<L>` — a distinct newtype that carries the same `u64` layout but enforces the invariant that the stored value equals `min(kmer, revcomp)`:
```rust ```rust
pub fn canonical(&self, k: usize) -> Self { pub fn canonical(&self) -> CanonicalKmerOf<L> {
let rc = self.revcomp(k); let rc = self.revcomp();
if self.0 <= rc.0 { *self } else { rc } CanonicalKmerOf(if self.0 <= rc.0 { self.0 } else { rc.0 }, PhantomData)
} }
``` ```
Lexicographic minimum of forward and reverse-complement, comparing the raw `u64` values directly (left-aligned encoding makes this equivalent to nucleotide-wise comparison). Zero allocation — result lives on the stack. Lexicographic minimum of forward and reverse-complement, comparing the raw `u64` values directly (left-aligned encoding makes this equivalent to nucleotide-wise comparison). Zero allocation — result lives on the stack.
`CanonicalKmerOf::from_raw_unchecked(raw)` is the only other public constructor, for trusted paths such as deserialisation.
## Sliding window helpers
`push_right(nuc)` / `push_left(nuc)` shift the window by one base in O(1). `is_overlapping(other)` checks whether the last k1 nucleotides of `self` equal the first k1 of `other`.
## Hashing
`hash_kmer(raw: u64) -> u64` computes `mix64(raw ^ 0x9e3779b97f4a7c15)`, the seeded splitmix64 finalizer. `CanonicalKmerOf::seq_hash()` delegates to `hash_kmer`.
+97 -34
View File
@@ -26,13 +26,36 @@ Default mode is `Presence`. `Count` mode requires **all** source indexes to have
All source indexes must satisfy: All source indexes must satisfy:
- `IndexState::Indexed` (fully built — `index.done` sentinel present) - `IndexState::Indexed` (fully built — `index.done` sentinel present)
- Same `kmer_size`, `minimizer_size`, `n_bits` - Same `kmer_size`, `minimizer_size`, `n_partitions`
- Same evidence kind: all `Exact`, or all `Approx` with identical `(b, z)` parameters
- If `Count` mode: all sources must have `with_counts=true` - If `Count` mode: all sources must have `with_counts=true`
`--force`: if the output directory already exists, it is deleted before the merge begins. `--force`: if the output directory already exists, it is deleted before the merge begins.
--- ---
## Evidence compatibility
`validate_evidence_compat(sources)` is called before any I/O. It compares each source's `EvidenceKind` against `sources[0]`:
- All `Exact` → accepted, output uses `Exact`
- All `Approx { b, z }` with same `(b, z)` → accepted, output uses those parameters
- Any other combination → `OKIError::IncompatibleEvidence`, with a message directing the user to run `reindex` first
Mixed exact/approx is a hard error, not a silent conversion.
```rust
fn validate_evidence_compat(sources: &[&KmerIndex]) -> OKIResult<EvidenceKind>
```
---
## Genome label deduplication
`compute_labels(sources, rename_duplicates)` assigns final genome labels across all sources before any file is written. The first occurrence of a label keeps the original name. Subsequent occurrences receive `.1`, `.2`, … suffixes when `rename_duplicates` is true, or trigger `OKIError::DuplicateGenomeLabel` otherwise.
---
## Algorithm ## Algorithm
### 1. Validation ### 1. Validation
@@ -41,36 +64,72 @@ Check all sources against the constraints above. Abort on any mismatch.
### 2. Bootstrap output from first source ### 2. Bootstrap output from first source
Recursive file copy of `sources[0]``output`. This establishes the partition layout, all existing MPHFs, unitigs, and evidence files. The first source's genome is genome 0 in the output. Recursive file copy of `sources[0]``output`. Immediately after the copy:
- `index.meta` is rewritten with the final genome list (all sources, possibly renamed) and the effective evidence kind.
- In `Presence` mode, any `counts/` directories inherited from source_0 are removed.
- `spectrums/` from source_0 is removed and rebuilt from scratch across all sources, applying the (possibly renamed) labels.
This establishes the partition layout, all existing MPHFs, unitigs, and evidence files. The first source's genomes occupy columns 0 … `n_dst_genomes - 1` in the destination.
### 3. For each subsequent source (parallel across partitions) ### 3. For each subsequent source (parallel across partitions)
For each partition, process one source at a time sequentially: `KmerPartition::merge_partition(i, sources, mode, n_dst_genomes, block_bits)` is called for each partition index `i`. `block_bits` is taken from `dst.meta.config.block_bits`.
**a. Classify kmers** Each entry in `sources` is `(&KmerPartition, n_genomes)` where `n_genomes` is the column count that source contributes (> 1 when the source is itself a merged index).
Iterate all kmers in the source partition (via `UnitigFileReader` + canonical kmer iteration). For each kmer, probe the destination `LayeredMap<()>`: **First merge, Presence mode**: when `n_dst_genomes == 1`, `Layer::<()>::init_presence_matrix` is called on every existing destination layer before any source column is appended. This creates `presence/col_000000.pbiv` set all-true (genome 0 is present in every slot).
- **Hit** `(dst_layer, slot)`: record `(dst_layer, slot, value)` where `value` is the source count (Count mode) or `1` (Presence mode). **Pass 1 — classify kmers**
- **Miss**: add kmer to a `GraphDeBruijn` accumulator; record its value in a `HashMap<CanonicalKmer, Vec<u32>>`.
**b. Extend existing layers** Iterate all kmers from all source partitions (via `UnitigFileReader` + canonical kmer iteration). For each kmer, probe the destination `LayeredMap<()>`:
For each existing layer in the destination partition, call `append_genome_column` once per source genome being merged. Slots that received a hit are populated with their recorded value; all other slots receive 0 (absent). - **Hit**: kmer already in the destination; record for Pass 2.
- **Miss**: push kmer into a `GraphDeBruijn` accumulator.
If this is the **first merge** and operating in Presence mode, call `Layer<()>::init_presence_matrix` before appending any source column. This creates `presence/` with `col_000000.pbiv` set all-true (genome 0 is present in every slot of this layer). **New layer construction**
**c. Build new layer for new kmers** If the accumulator is non-empty, compute de Bruijn unitigs and call `Layer::<()>::build(&new_layer_dir, block_bits)`. All kmers absent from the destination — across **all** sources — accumulate into a **single** graph, producing one new layer per merge operation (not one per source).
If the `GraphDeBruijn` accumulator is non-empty (misses exist), write compact unitigs from the de Bruijn graph, then call the appropriate `Layer::build` variant. Before appending the source's own column, prepend `n_existing_genomes` absent columns (value 0) — one per genome already in the index — so the column count invariant holds immediately after layer creation. **Pass 2 — fill column builders**
**d. Update partition metadata** For each source and each of its layers, re-iterate unitigs and look up stored values via `SrcLayerData::lookup(kmer, src_n)`:
Write updated `meta.json` with the incremented `n_layers` if a new layer was added. - `SrcLayerData::SetMembership` — no data directory exists; every kmer returns `vec![1; n_genomes]`
- `SrcLayerData::Presence` — reads `PersistentBitMatrix` from `presence/`
- `SrcLayerData::Count` — reads `PersistentCompactIntMatrix` from `counts/`
Hits are routed to `exist_builders[dst_layer][src_col]`; misses are routed to `new_src_builders[src_col]`.
**Column prepending for new layers**
Before source columns are written to the new layer, `n_dst_genomes` absent columns (all-zero / all-false) are prepended — one per genome already in the index — so the column count invariant holds immediately after layer creation.
**Close and update metadata**
Close all builders; update `presence/meta.json` or `counts/meta.json` with `{"n": N, "n_cols": n_dst_genomes + n_src_total}`; increment `PartitionMeta::n_layers` if a new layer was added.
### 4. Update index metadata ### 4. Update index metadata
Append the merged source's genome label(s) to `index.meta.genomes`. Write updated `index.meta`. `index.meta` was already written during bootstrap with the complete genome list and evidence kind. No further update is needed after the partition loop.
---
## `append_genome_column`
Defined on two concrete specialisations of `Layer<D>`:
```rust
impl Layer<PersistentCompactIntMatrix> {
pub fn append_genome_column(layer_dir: &Path, value_of: impl Fn(usize) -> u32) -> OLMResult<()>
}
impl Layer<PersistentBitMatrix> {
pub fn append_genome_column(layer_dir: &Path, value_of: impl Fn(usize) -> bool) -> OLMResult<()>
}
```
Each appends one column file to the matrix subdirectory (`counts/` or `presence/`). In `merge_partition`, columns are written directly via `PersistentBitVecBuilder` / `PersistentCompactIntVecBuilder` rather than through these helpers, but the invariant they enforce is the same.
--- ---
@@ -78,27 +137,31 @@ Append the merged source's genome label(s) to `index.meta.genomes`. Write update
After any merge, **every layer in every partition has exactly `n_genomes` columns**, where `n_genomes` is the total genome count in the index at that point. After any merge, **every layer in every partition has exactly `n_genomes` columns**, where `n_genomes` is the total genome count in the index at that point.
This is maintained by three mechanisms: Maintained by three mechanisms:
1. **Existing layers**: one column appended per source genome (`append_genome_column`). 1. **Existing layers**: `n_src_total` columns appended (one per source genome).
2. **New layers created during merge**: `n_existing_genomes` absent columns prepended before the source's own column. 2. **New layers created during merge**: `n_dst_genomes` absent columns prepended before source columns.
3. **First merge, Presence mode**: `init_presence_matrix` retroactively creates `presence/col_0` all-true for genome 0 before any source column is appended. 3. **First merge, Presence mode**: `init_presence_matrix` retroactively creates `presence/col_0` all-true for genome 0.
The invariant is a precondition of the `LayeredStore` aggregation traits: `col_weights()` and all partial distance methods assume every inner store has the same column count. The invariant is a precondition of `LayeredStore` aggregation traits: `col_weights()` and all partial distance methods assume every inner store has the same column count.
--- ---
## New layer construction ## Error variants relevant to merge
All kmers absent from the destination index — across **all** sources being merged — accumulate into a **single** `GraphDeBruijn` per partition. One new layer is built from this graph, not one layer per source. This keeps the layer count bounded and preserves compact unitig representation. | Variant | Condition |
|---|---|
De Bruijn reconstruction ensures that overlapping k-1 suffixes/prefixes from different source kmers are merged into minimal unitigs before MPHF construction. | `OKIError::NotIndexed(path)` | Source not in `Indexed` state |
| `OKIError::IncompatibleConfig` | Mismatched `kmer_size`, `minimizer_size`, or `n_partitions` |
| `OKIError::MismatchedMode` | Count mode but a source has `with_counts=false` |
| `OKIError::IncompatibleEvidence(msg)` | Mixed exact/approx or different approx `(b, z)` |
| `OKIError::DuplicateGenomeLabel(label)` | Duplicate label and `rename_duplicates=false` |
--- ---
## On-disk impact ## On-disk impact
After merging `G` genomes (1 existing + G-1 new sources): After merging `G` genomes (sources_0 contributes `G0`, subsequent sources the rest):
``` ```
partitions/ partitions/
@@ -106,28 +169,28 @@ partitions/
index/ index/
meta.json ← n_layers updated if new layer added meta.json ← n_layers updated if new layer added
layer_0/ layer_0/
mphf.bin ← unchanged (MPHF immutable) mphf.bin ← unchanged
unitigs.bin ← unchanged unitigs.bin ← unchanged
evidence.bin ← unchanged evidence.bin ← unchanged
presence/ ← created on first merge (Presence mode) presence/ ← created on first merge (Presence mode)
meta.json {"n": N, "n_cols": G} meta.json {"n": N, "n_cols": G}
col_000000.pbiv ← all-true (genome 0) col_000000.pbiv ← all-true (genome 0 … G0-1)
col_000001.pbiv ← source 1 presence col_000001.pbiv ← next source
... ...
counts/ ← extended (Count mode) counts/ ← extended (Count mode)
meta.json {"n": N, "n_cols": G} meta.json {"n": N, "n_cols": G}
col_000000.pciv ← genome 0 counts (from original build) col_000000.pciv ← genome 0 counts (from original build)
col_000001.pciv ← source 1 counts col_000001.pciv ← next source
... ...
layer_1/ ← new layer (if new kmers found) layer_N/ ← new layer (if new kmers found)
mphf.bin mphf.bin
unitigs.bin unitigs.bin
evidence.bin evidence.bin
presence/ or counts/ presence/ or counts/
meta.json {"n": N1, "n_cols": G} meta.json {"n": N1, "n_cols": G}
col_000000.pbiv ← all-false (genome 0 absent from this layer) col_000000.pbiv ← all-false (absent for existing genomes)
... ...
col_000001.pbiv ← source 1 presence in this new layer spectrums/
<label>.json ← one file per genome, rebuilt from all sources
index.meta ← complete genome list + evidence kind written at bootstrap
``` ```
The `index.meta` genome list grows from 1 entry to G entries after all sources are merged.
+47 -8
View File
@@ -27,10 +27,10 @@ part_XXXXX/
After filtering (applying a min-count threshold derived from the spectrum) and building the local De Bruijn graph + unitigs (see [Construction pipeline](pipeline.md)), the exact filtered kmer set is available via `unitigs.bin`. After filtering (applying a min-count threshold derived from the spectrum) and building the local De Bruijn graph + unitigs (see [Construction pipeline](pipeline.md)), the exact filtered kmer set is available via `unitigs.bin`.
`MphfLayer::build` is called on the unitig file: `MphfLayer::build(dir, block_bits, fill_slot)` is called on the unitig directory:
1. **Pass 1**: iterate all canonical kmers from `unitigs.bin` in parallel, build and store `mphf.bin` (ptr_hash). 1. **Pass 1**: build `.idx` via `build_unitig_idx(unitig_path, block_bits)`, then iterate all canonical kmers in parallel over chunks using `(0..unitigs.len()).into_par_iter()` + `unitigs.unitig(ci).into_canonical_kmers()`. Constructs and stores `mphf.bin` (ptr_hash, `new_from_par_iter`).
2. **Pass 2**: iterate sequentially, fill `evidence.bin`, call the mode-specific `fill_slot` callback. 2. **Pass 2**: iterate sequentially with `iter_indexed_canonical_kmers`; fill `evidence.bin`; call `fill_slot(slot, kmer)` callback once per kmer for DataStore population.
`mphf1.bin` and `counts1.bin` are no longer needed after phase 2 and can be deleted. `mphf1.bin` and `counts1.bin` are no longer needed after phase 2 and can be deleted.
@@ -105,9 +105,12 @@ Each layer is a self-contained unit. See [obilayeredmap](obilayeredmap.md) for t
``` ```
layer_i/ layer_i/
unitigs.bin — packed 2-bit nucleotide sequences (kmer evidence) unitigs.bin — packed 2-bit nucleotide sequences (kmer evidence source)
unitigs.bin.idx — random-access block index (block_bits controls granularity)
mphf.bin — ptr_hash phase-2 MPHF mphf.bin — ptr_hash phase-2 MPHF
evidence.bin — n × u32: (chunk_id: 25 bits | rank: 7 bits) per slot evidence.bin — n × (chunk_id: 25 bits | rank: 7 bits) per slot [exact mode]
fingerprint.bin — n × b-bit fingerprints per slot [approx mode]
layer_meta.json — evidence kind, recorded at build time
``` ```
Layers are **disjoint**: a canonical kmer belongs to exactly one layer. Layer 0 is built from dataset A. Adding dataset B: Layers are **disjoint**: a canonical kmer belongs to exactly one layer. Layer 0 is built from dataset A. Adding dataset B:
@@ -116,9 +119,45 @@ Layers are **disjoint**: a canonical kmer belongs to exactly one layer. Layer 0
2. Collect kmers of B not present in any layer → set `B \ A`. 2. Collect kmers of B not present in any layer → set `B \ A`.
3. Build layer 1 from `B \ A` (dereplicate → count → De Bruijn → unitigs → `MphfLayer::build`). 3. Build layer 1 from `B \ A` (dereplicate → count → De Bruijn → unitigs → `MphfLayer::build`).
### Evidence modes
Two evidence modes are supported, selected at build time via `EvidenceKind` and recorded in `layer_meta.json`.
**Exact** (`EvidenceKind::Exact`): `evidence.bin` stores one `(chunk_id, rank)` pair per MPHF slot, encoding the position of the corresponding kmer in `unitigs.bin`. Membership verification reconstructs the kmer from `(chunk_id, rank)` and compares it to the query. Zero false positives. Requires `.idx` for random access.
**Approx** (`EvidenceKind::Approx { b, z }`): `fingerprint.bin` stores a `b`-bit hash of the kmer at each MPHF slot. Membership check compares `kmer.seq_hash()` against the stored fingerprint. False-positive rate: 1/2^b per query. No `.idx` is written or needed.
### Build functions
```
MphfLayer::build(dir, block_bits, fill_slot)
Pass 1: par_iter over chunks via .idx → build mphf.bin
Pass 2: sequential iter → fill evidence.bin + call fill_slot
MphfLayer::build_exact_evidence(dir, block_bits)
Standalone post-hoc: builds evidence.bin + .idx from existing mphf.bin + unitigs.bin
Uses open_sequential(); no .idx required on entry
MphfLayer::build_approx_evidence(dir, b, z)
Standalone post-hoc: builds fingerprint.bin from existing mphf.bin + unitigs.bin
Uses open_sequential(); never writes .idx
MphfLayer::build_evidence(dir, kind, block_bits)
Dispatch wrapper: routes to build_exact_evidence or build_approx_evidence
```
`build` always produces exact evidence. If approximate evidence is needed (e.g. `EvidenceKind::Approx`), the caller invokes `build_approx_evidence` after `build` to replace the evidence bundle.
In `obikpartitionner`, `build_index_layer` receives `block_bits: u8` from `IndexConfig::block_bits` and forwards it directly to `Layer::build` and `Layer::build_approx_evidence`.
### Membership verification ### Membership verification
ptr_hash maps any input to a valid slot — it does not natively detect absent keys. Membership is verified using the evidence entry: decode the kmer from `(chunk_id, rank)` and compare to the query. A mismatch means the kmer is absent from this layer; probe the next layer. ptr_hash maps any input to a valid slot — it does not natively detect absent keys. Membership is verified using the evidence entry:
- **Exact**: decode `(chunk_id, rank)` from `evidence.bin`; reconstruct the kmer via `unitigs.verify_canonical_kmer`; compare to query.
- **Approx**: compare `kmer.seq_hash()` to the b-bit fingerprint stored at the slot.
A mismatch in either mode means the kmer is absent from this layer; probe the next layer.
### Query algorithm ### Query algorithm
@@ -126,12 +165,12 @@ ptr_hash maps any input to a valid slot — it does not natively detect absent k
fn query(kmer) → Option<(layer_index, slot)>: fn query(kmer) → Option<(layer_index, slot)>:
for (i, layer) in layers.iter().enumerate(): for (i, layer) in layers.iter().enumerate():
slot = layer.mphf.index(kmer) slot = layer.mphf.index(kmer)
if layer.evidence.decode(slot) matches kmer: if layer.evidence.matches(slot, kmer): // exact or approx dispatch
return Some((i, slot)) return Some((i, slot))
return None return None
``` ```
Expected probe depth: 1 for kmers in layer 0. Each probe is a ptr_hash lookup (~10 ns) plus one evidence decode. `MphfLayer::find` dispatches transparently to `find_exact` or `find_approx` based on the evidence loaded at `open` time. Expected probe depth: 1 for kmers in layer 0. Each probe is a ptr_hash lookup (~10 ns) plus one evidence check.
### Merging layers ### Merging layers
+52 -20
View File
@@ -1,16 +1,26 @@
# obipipeline — parallel pipeline library # obipipeline — parallel pipeline library
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **transforms**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler. `obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **stages**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
## Core types ## Core types
| Type alias | Rust type | Role | | Type alias | Rust type | Role |
|---|---|---| |---|---|---|
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send+Sync>` | Called repeatedly; `FnMut` because it holds iterator state | | `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send>` | Called repeatedly; `FnMut` because it holds iterator state |
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send+Sync>` | Shared across workers via `Arc::clone` (no copy of the closure) | | `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send + Sync>` | 1→1 transform shared across workers via `Arc::clone` |
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send+Sync>` | Final consumer; returns `Result` so errors propagate back | | `SharedFlatFn<D>` | `Arc<dyn Fn(D, &Sender<Result<D, _>>, &Sender<isize>) + Send + Sync>` | 1→N transform; pushes items into channel, sends delta |
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send>` | Final consumer; returns `Result` so errors propagate back |
`Pipeline<D>` holds one `SourceFn`, a `Vec<SharedFn>`, and one `SinkFn`. Stages come in two variants:
```rust
pub enum Stage<D> {
Transform(SharedFn<D>), // 1→1
Flat(SharedFlatFn<D>), // 1→N
}
```
`Pipeline<D>` holds one `SourceFn`, a `Vec<Stage>`, and one `SinkFn`.
`WorkerPool<D>` wraps a `Pipeline` with `n_workers` and channel `capacity`. `WorkerPool<D>` wraps a `Pipeline` with `n_workers` and channel `capacity`.
## WorkerPool ## WorkerPool
@@ -23,7 +33,7 @@ WorkerPool::run(self)
| Parameter | Role | | Parameter | Role |
|---|---| |---|---|
| `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. | | `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. |
| `capacity` | Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees. | | `capacity` | Bound on every crossbeam channel in the pipeline. Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
`run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning. `run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.
@@ -43,7 +53,7 @@ Each variant carries the concrete type for one stage's output. The macros patter
## Macros ## Macros
Six low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them. Eight low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
### Low-level ### Low-level
@@ -54,6 +64,9 @@ make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T
make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E> make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>
make_flat_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> impl IntoIterator<Item=U>
make_flat_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<impl IntoIterator<Item=U>, E>
make_sink!(Enum, func, InputVariant) // func: T -> () make_sink!(Enum, func, InputVariant) // func: T -> ()
make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E> make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E>
``` ```
@@ -66,8 +79,10 @@ Each macro wraps the closure in the correct smart pointer (`Box` for source/sink
make_pipeline! { make_pipeline! {
DataEnum, DataEnum,
source iterator => OutputVariant, // or source? for fallible source iterator => OutputVariant, // or source? for fallible
| func: In => Out, // non-fallible transform | func: In => Out, // 1→1 non-fallible transform
|? func: In => Out, // fallible transform |? func: In => Out, // 1→1 fallible transform
|| func: In => Out, // 1→N non-fallible flat transform
||? func: In => Out, // 1→N fallible flat transform
sink func @ InputVariant, // or sink? for fallible sink func @ InputVariant, // or sink? for fallible
} }
``` ```
@@ -75,12 +90,29 @@ make_pipeline! {
`?` marks fallibility on source, individual transforms, or sink independently. `?` marks fallibility on source, individual transforms, or sink independently.
Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`. Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`.
### make_pipe! DSL
`make_pipe!` builds a sourceless/sinkless `Pipe<D, In, Out>` — a reusable, composable stage sequence:
```
make_pipe! {
DataEnum : InType => OutType,
| func: InVariant => OutVariant,
|? func: InVariant => OutVariant,
|| func: InVariant => OutVariant,
||? func: InVariant => OutVariant,
}
```
Two pipes compose with `.then(other)`. Apply to an iterator with `.apply(iter, n_workers, capacity)` to get a `PipeIter<Out>` — an iterator over the pipeline output, backed by a background `WorkerPool`. The scatter step in `obikmer` uses `make_pipe!` and `.apply()` rather than the full `make_pipeline!` / `WorkerPool` pattern.
## Scheduler architecture ## Scheduler architecture
``` ```
Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N) Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
▲ │ ▲ │
[stage_rxs] ────────┘◄──────────────────────────────┘ [stage_rxs] ────────┘◄──────────────────────────────┘
[flat_delta_rx] ──► Scheduler (in_flight adjustment)
[sink_err_rx] ← errors from sink (highest priority) [sink_err_rx] ← errors from sink (highest priority)
@@ -91,23 +123,23 @@ The scheduler is a single thread running a biased `Select` over all input channe
``` ```
index 0 sink_err_rx abort on sink error index 0 sink_err_rx abort on sink error
index 1 stage_rxs[N-1] drain last stage first index 1 flat_delta_rx adjust in_flight before dispatching
... index 2..=n+1 stage_rxs[n-1..0] drain last stage first
index N stage_rxs[0] index n+2 source_rx pull new data last
index N+1 source_rx pull new data last
``` ```
This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline. This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.
**Workers** are generic: each receives `(data, SharedFn, result_tx)` and calls `f(data)`, sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result. **Workers** are generic: each receives a `WorkerTask` — either `Transform(data, stage_idx)` or `Flat(data, stage_idx)`. For `Transform`, the worker calls `f(data)` and sends the result to `stage_txs[stage_idx]`. For `Flat`, the worker calls `f(data, &push_tx, &delta_tx)`: the closure pushes N items into `push_tx` then sends `N-1` to `delta_tx`. The scheduler uses the delta to adjust `in_flight` without knowing N in advance.
**Termination** uses an `in_flight` counter: **Termination** uses an `in_flight: isize` counter and a `flat_workers_active: usize` counter:
- incremented when an item is dispatched from source to workers - `in_flight` incremented when an item is dispatched from source to workers
- decremented when the item exits the last stage - `in_flight` decremented when the item exits the last stage to the sink
- the loop exits only when `source_done && in_flight == 0` - `flat_workers_active` incremented when a `Flat` task is dispatched, decremented when the delta arrives
- the loop exits only when `source_done && in_flight == 0 && flat_workers_active == 0`
This guarantees all in-flight items complete before `join()`. This guarantees all in-flight items complete (including all N outputs of a flat stage) before `join()`.
## Error handling ## Error handling
@@ -118,7 +150,7 @@ This guarantees all in-flight items complete before `join()`.
| `EndOfStream` | Source exhausted (normal termination, not sent downstream) | | `EndOfStream` | Source exhausted (normal termination, not sent downstream) |
| `TypeMismatch` | Wrong enum variant arrived at a stage | | `TypeMismatch` | Wrong enum variant arrived at a stage |
| `StepKindMismatch` | Internal routing error | | `StepKindMismatch` | Internal routing error |
| `StepError(Box<dyn Error>)` | Error from user code (wrapped by `make_*_fallible!`) | | `StepError(Box<dyn Error + Send + Sync>)` | Error from user code (wrapped by `make_*_fallible!`) |
Sink errors flow back to the scheduler via a dedicated `Receiver<PipelineError>` registered at index 0 of the Select — the pipeline stops immediately on the first sink error. Sink errors flow back to the scheduler via a dedicated `Receiver<PipelineError>` registered at index 0 of the Select — the pipeline stops immediately on the first sink error.
+68 -11
View File
@@ -88,9 +88,7 @@ Implemented as a three-step pipeline in `count_partition()`:
2. **Provisional MPHF** (ptr_hash): built from `sorted_unique.bin` via `new_from_par_iter(f0, ...)`. Stored to `mphf1.bin`; `sorted_unique.bin` deleted immediately. 2. **Provisional MPHF** (ptr_hash): built from `sorted_unique.bin` via `new_from_par_iter(f0, ...)`. Stored to `mphf1.bin`; `sorted_unique.bin` deleted immediately.
3. **Accumulation pass**: re-read dereplicated superkmers; for each kmer, `slot = mphf.index(kmer.raw())`, increment `counts1[slot]` by the superkmer COUNT. Stored in a `PersistentCompactIntVec` (`counts1.bin`). 3. **Accumulation pass**: re-read dereplicated superkmers; for each kmer, `slot = mphf.index(kmer.raw())`, increment `counts1[slot]` by the superkmer COUNT. Stored in a `PersistentCompactIntVec` (`counts1.bin`).
At the end of this phase, each distinct canonical kmer has its exact total count, and the frequency spectrum (`kmer_spectrum_raw.json`) is written. At the end of this phase, each distinct canonical kmer has its exact total count, and the frequency spectrum (`spectrums/{label}.json`) is written to the index root.
Abundance filter applied here: kmers with `total_count < q` are discarded. `q` is a collection parameter (0 = keep all, including singletons for ≤1x data).
No pre-filter on super-kmer COUNT is possible at phase 2: a super-kmer with COUNT=1 may contain only high-abundance kmers, each present in many other super-kmers across the partition. No pre-filter on super-kmer COUNT is possible at phase 2: a super-kmer with COUNT=1 may contain only high-abundance kmers, each present in many other super-kmers across the partition.
@@ -140,19 +138,70 @@ Output: `unitigs.bin` — the permanent evidence structure for the partition. Ea
## Phase 6 — MPHF construction and index finalisation ## Phase 6 — MPHF construction and index finalisation
Built once on the definitive kmer set (all kmers in all unitigs of the partition). See [obilayeredmap](obilayeredmap.md) and [MPHF selection](mphf.md) for the current implementation. `build_index_layer` is called per partition (in parallel via `build_layers`) with the following parameters sourced from `IndexConfig`:
- `block_bits` — from `IndexConfig::block_bits`; controls the `.idx` block size (2^block_bits unitig chunks per block) for exact evidence
- `evidence` — `EvidenceKind::Exact` or `EvidenceKind::Approx { b, z }`; propagated unchanged from `IndexConfig::evidence`
- `min_ab` / `max_ab` — abundance bounds applied before graph construction
- `with_counts` — whether to store kmer counts alongside set membership
**Abundance filtering:** when `min_ab > 1` or `max_ab.is_some()`, the provisional `mphf1.bin` and `counts1.bin` produced in phase 3 are memory-mapped. Each canonical kmer is accepted only if its count in `counts1` satisfies the bounds. If either file is absent, filtering is skipped (all kmers accepted).
``` ```
kmers from unitigs → MPHF → mphf.bin for each kmer in dereplicated super-kmer:
→ evidence.bin : n × u32, each = (chunk_id: 25 bits | rank: 7 bits) ab = counts1[mphf1.index(kmer.raw())]
→ payload : counts/ (mode 2) or presence/ (mode 3) if ab < min_ab || ab > max_ab: skip
graph.push(kmer)
``` ```
The MPHF is built in two passes over `unitigs.bin`: parallel pass for `mphf.bin`, sequential pass for `evidence.bin` and payload. The exact kmer count is available from the unitig index (`unitigs.bin.idx`) before the passes begin. **Graph build and unitig write:**
**Exact verification via unitig evidence:** The surviving kmers are fed into `GraphDeBruijn`, which computes degrees and yields unitigs. Unitigs are written to `layer_0/unitigs.bin` via a `UnitigFileWriter`.
`unitigs.bin` serves as the evidence structure. The MPHF maps every input to `[0, N)` including absent kmers — the unitig read-back (via `evidence.bin`) is the only correct membership test. **MPHF and evidence build:**
`Layer::build` (membership-only) or `Layer::<PersistentCompactIntMatrix>::build` (with counts) is called next. Internally, `MphfLayer::build` performs two passes:
1. **Pass 1 (parallel):** build `unitigs.bin.idx` (block size = 2^`block_bits`) then construct the MPHF from all canonical kmers in `unitigs.bin`; store to `mphf.bin`.
2. **Pass 2 (sequential):** for each kmer in `unitigs.bin`, compute its slot and write `evidence.bin` (`chunk_id: 25 bits | rank: 7 bits` packed into a `u32`); also invoke the payload callback (`fill_slot`) to populate `counts/` if `with_counts`.
After `Layer::build` completes, `layer_meta.json` records `EvidenceKind::Exact`.
**Approximate evidence override:**
If `evidence` is `EvidenceKind::Approx { b, z }`, `build_approx_evidence` is called immediately after `Layer::build`. It overwrites the exact evidence bundle with `fingerprint.bin` (b-bit hash per slot) and rewrites `layer_meta.json` with `EvidenceKind::Approx { b, z }`. No `.idx` file is needed at query time in this mode.
```
// Exact path → evidence.bin + unitigs.bin.idx + layer_meta.json(Exact)
// Approx path → fingerprint.bin + layer_meta.json(Approx{b,z})
// (evidence.bin left on disk but not used)
```
**Partition metadata:**
After all layer files are written, `PartitionMeta { n_layers: 1 }` is serialised to `index/meta.json` inside the partition directory. This file is required by `LayeredMap::open` for subsequent merge operations.
**File layout per partition after phase 6:**
```
part_XXXXX/
index/
meta.json ← PartitionMeta { n_layers: 1 }
layer_0/
unitigs.bin ← permanent evidence (all modes)
unitigs.bin.idx ← block index (exact mode only)
mphf.bin ← MPHF
evidence.bin ← exact evidence (exact mode)
fingerprint.bin ← b-bit fingerprints (approx mode)
layer_meta.json ← EvidenceKind tag
counts/ ← PersistentCompactIntMatrix (with_counts only)
```
**Cleanup:** unless `--keep-intermediate` is set, `remove_build_artifacts` deletes `dereplicated.skmer.zst`, `mphf1.bin`, and `counts1.bin` after all partitions are indexed.
See [obilayeredmap](obilayeredmap.md) and [MPHF selection](mphf.md) for data structure details.
**Query path (exact evidence):**
``` ```
query kmer q query kmer q
@@ -164,4 +213,12 @@ query kmer q
→ no match: kmer absent ← MPHF collision on absent kmer → no match: kmer absent ← MPHF collision on absent kmer
``` ```
`superkmers.bin.gz` is no longer needed at this point and can be deleted. **Query path (approximate evidence):**
```
query kmer q
→ MPHF(q) → slot s
→ fingerprint[s] matches seq_hash(q)?
→ yes : probable hit (FP rate = 1/2^b per kmer, 1/2^(b·z) per z-window)
→ no : kmer absent
```
+48 -45
View File
@@ -2,36 +2,47 @@
## Memory layout ## Memory layout
A super-kmer is stored as a **32-bit header** followed by a **byte-aligned nucleotide sequence** (2 bits/base, nucleotide 0 at the MSB of the first byte): `SuperKmer` holds two separate fields:
| Field | Bits | Role |
|-------|------|------|
| COUNT | 24 | Occurrence count (≤ 16 M) |
| NKMERS | 8 | Number of kmers (= seq_length k + 1, range 1255) |
Bit layout (MSB to LSB): `[31:8] COUNT [7:0] NKMERS`
NKMERS is stored as a raw `u8` in **kmer units**, not nucleotides. The nucleotide length is recovered as `NKMERS + k 1`. This avoids the awkward wrapping convention (`0 = 256`) that would be needed if nucleotide length were stored directly, and gains k1 = 30 units of headroom:
| unit | u8 covers | max nucleotides |
|---|---|---|
| nucleotides | 255 nt | 225 kmers |
| **kmers** | **255 kmers** | **285 nt** |
The public accessors:
```rust ```rust
fn n_kmers(&self) -> usize { (self.0 & 0xFF) as usize } pub struct SuperKmer {
fn seql(&self) -> usize { self.n_kmers() + K - 1 } pub(crate) count: u32,
fn count(&self) -> u32 { self.0 >> 8 } pub(crate) inner: PackedSeq,
fn increment(&mut self) { self.0 += 1 << 8; } }
fn add(&mut self, n: u32) { self.0 += n << 8; }
fn set_count(&mut self, n: u32) { self.0 = (self.0 & 0xFF) | (n << 8); }
``` ```
In practice, observed super-kmer lengths on metagenomic data (k=31) are below 55 nucleotides (≤ 25 kmers) — far from the 255-kmer cap. If a super-kmer ever exceeds 255 kmers, it is split with a k1 nucleotide overlap, preserving all kmers without duplication (identical mechanism to partition-boundary splits). `PackedSeq` stores a 2-bit packed DNA sequence as a heap-allocated `Box<[u8]>` plus a `tail: u8` field:
The sequence is always stored in canonical form (lexicographic minimum of forward and reverse complement), with nucleotide 0 at the MSB of the first byte. The byte array can be hashed directly without any adjustment. | Field | Type | Role |
|-------|------|------|
| `tail` | `u8` | Number of valid nucleotides in the last byte: 0 encodes 4, 13 are identity |
| `seq` | `Box<[u8]>` | 2-bit packed bytes, nucleotide 0 at bits 76 of `seq[0]` |
Nucleotide length is recovered without storing it explicitly:
```text
seql = (seq.len() - 1) * 4 + tail_count(tail)
```
There is no packed header word — `count` and the sequence live in separate fields.
The on-disk binary format (produced by `write_to_binary`) is:
```text
[varint(count)] [u8: seql k] [packed bytes…]
```
`seql k` fits in a `u8` when `n_kmers = seql k + 1 ≤ MAX_KMERS_PER_CHUNK (= 256)`. If a super-kmer exceeds 256 kmers, `write_to_binary` splits it into overlapping chunks (k1 nucleotide overlap, same count per chunk), each a self-contained record readable by `read_from_binary`.
The public accessors operate on the struct fields directly:
```rust
fn seql(&self) -> usize { self.inner.seql() }
fn count(&self) -> u32 { self.count }
fn increment(&mut self) { self.count += 1; }
fn add(&mut self, n: u32) { self.count += n; }
fn set_count(&mut self, n: u32) { self.count = n; }
```
## ASCII encoding and decoding ## ASCII encoding and decoding
@@ -72,7 +83,7 @@ const fn revcomp4(x: u8) -> u8 {
**Step 2 — realignment.** After step 1, `padding = n × 8 seql × 2` spurious bits (complements of the original padding A's) appear at the start of the array. They are flushed left using `BitSlice<u8, Msb0>::rotate_left(padding)` from the `bitvec` crate, which is SIMD-accelerated. The trailing `padding` bits are then zeroed: **Step 2 — realignment.** After step 1, `padding = n × 8 seql × 2` spurious bits (complements of the original padding A's) appear at the start of the array. They are flushed left using `BitSlice<u8, Msb0>::rotate_left(padding)` from the `bitvec` crate, which is SIMD-accelerated. The trailing `padding` bits are then zeroed:
```rust ```rust
let seql = self.n_kmers() + k - 1; let seql = self.seql();
shift = n * 8 - seql * 2 // number of padding bits shift = n * 8 - seql * 2 // number of padding bits
bits.rotate_left(shift) bits.rotate_left(shift)
bits[len - shift..].fill(false) bits[len - shift..].fill(false)
@@ -93,7 +104,7 @@ bits[len - shift..].fill(false)
## Minimizer sliding window ## Minimizer sliding window
Super-kmers are built by `SuperKmerIter` (crate `obiskbuilder`), which maintains the current minimizer with a **monotonic deque** over a sliding window of W = k m + 1 m-mer positions. Super-kmers are built by `SuperKmerIter` (crate `obiskbuilder`), which tracks the current minimizer with a **monotonic deque** (`Ring<MmerItem, 32>`) inside `RollingStat`, a rolling-window entropy and minimizer tracker.
Each deque entry stores: Each deque entry stores:
@@ -101,20 +112,9 @@ Each deque entry stores:
|------------|-------|----------------------------------------------| |------------|-------|----------------------------------------------|
| `position` | usize | 0-based start of this m-mer in the segment | | `position` | usize | 0-based start of this m-mer in the segment |
| `canonical`| u64 | right-aligned canonical m-mer value (lex-min of fwd and rc); used as partition key | | `canonical`| u64 | right-aligned canonical m-mer value (lex-min of fwd and rc); used as partition key |
| `hash` | u64 | $H(\text{canonical})$ — ordering key for random minimizer selection | | `hash` | u64 | `hash_kmer(canonical << (64 2m))` — ordering key for random minimizer selection |
The hash $H$ is the seeded splitmix64 finalizer (see [Minimizer selection](../theory/minimizer.md)): The hash uses the seeded splitmix64 finalizer (`mix64(raw ^ 0x9e3779b97f4a7c15)`), the same function as `kmer::hash_kmer`.
```rust
fn hash_mmer(canonical: u64) -> u64 {
let x = canonical ^ 0x9e3779b97f4a7c15; // seed: eliminates fixed point at 0
let x = x ^ (x >> 30);
let x = x.wrapping_mul(0xbf58476d1ce4e5b9);
let x = x ^ (x >> 27);
let x = x.wrapping_mul(0x94d049bb133111eb);
x ^ (x >> 31)
}
```
On each new nucleotide, once the window is full, the deque is updated: On each new nucleotide, once the window is full, the deque is updated:
@@ -133,24 +133,27 @@ On each new nucleotide, once the window is full, the deque is updated:
The front of the deque is always the current minimizer. Because the deque is maintained in strictly increasing hash order, each entry is popped at most once — O(1) amortized per nucleotide. The front of the deque is always the current minimizer. Because the deque is maintained in strictly increasing hash order, each entry is popped at most once — O(1) amortized per nucleotide.
A super-kmer boundary is emitted when the minimizer changes: `deque.front.hash ≠ prev_hash`. The `canonical` field of the front entry is **not** used for boundary detection — that uses the hash alone. The canonical value is stored so that the partition key $H(\text{canonical})$ can be recomputed independently at routing time from the stored `minimizer_pos`, without inheriting the minimum-order-statistic bias (see [Minimizer selection — partition key independence](../theory/minimizer.md#partition-key-independence)). A super-kmer boundary is emitted when the minimizer changes: `current_minimizer != prev_minimizer`. `SuperKmerIter` also emits a boundary when:
- entropy of the current k-mer falls at or below the threshold θ (cursor retreated by k1)
- super-kmer length reaches 256 nucleotides (cursor retreated by k)
## Kmer extraction ## Kmer extraction
A k-mer is extracted from a super-kmer with `SuperKmer::kmer(i, k)`, which returns a `Kmer` — a left-aligned `u64` newtype (see [Kmer implementation](kmer.md)): A k-mer is extracted from a super-kmer with `SuperKmer::kmer(i)`, which delegates to `PackedSeq::extract::<KLen>(i)` and returns a `Kmer` — a left-aligned `u64` newtype (see [Kmer implementation](kmer.md)):
```rust ```rust
pub fn kmer(&self, i: usize, k: usize) -> Result<Kmer, KmerError> pub fn kmer(&self, i: usize) -> Result<Kmer, KmerError>
``` ```
The bit slice `seq[i*2 .. (i+k)*2]` (Msb0 order) is loaded as a big-endian `u64` via `bitvec::load_be`, then left-shifted to produce the canonical left-aligned layout. One call — no loop, no allocation. The bit slice `seq[i*2 .. (i+k)*2]` (Msb0 order) is loaded as a `u64` via `bitvec::load_be`, then left-shifted to produce the canonical left-aligned layout. One call — no loop, no allocation.
--- ---
!!! abstract "Algorithm — Super-kmer reverse complement" !!! abstract "Algorithm — Super-kmer reverse complement"
```text ```text
procedure SuperKmerRevcomp(seq, SEQL): procedure SuperKmerRevcomp(seq, SEQL):
seql ← NKMERS + k 1 -- nucleotide length seql ← nucleotide length
n ← ⌈seql / 4⌉ -- number of bytes n ← ⌈seql / 4⌉ -- number of bytes
shift ← n × 8 seql × 2 -- padding bits to flush shift ← n × 8 seql × 2 -- padding bits to flush
+4 -3
View File
@@ -174,7 +174,8 @@ Evidence cost decreases by 1 bit/kmer with each doubling of partition count (via
--- ---
## Open questions ## Alternative: fingerprint evidence
- **Cross-partition set operations**: strategy B allows unitig-level operations (mark entire chunks present/absent) rather than kmer-level, reducing cost by a factor of m_u. `evidence.bin` can be replaced by `fingerprint.bin` at index build time (`--approx`) or after the fact (`reindex --approx`). The fingerprint stores b bits per MPHF slot (the low b bits of `kmer.seq_hash()`); verification becomes a single bitfield comparison instead of a unitig dereference. False-positive rate per k-mer query: 1/2^b. With the Findere z parameter, z consecutive k-mers must all match, reducing the effective window FP rate to 1/2^(b·z) while skipping z1 of every z k-mers. No `.idx` file is written or read in approx mode.
- **Eliminating evidence.bin**: at ~66% of per-layer lookup footprint, `evidence.bin` dominates index size. See [Evidence elimination design discussion](evidence_elimination.md).
See [Approximate evidence (Findere fingerprint)](evidence_elimination.md) for the full design and CLI parameters.
+18
View File
@@ -2,10 +2,28 @@
`obikmer` is a Rust tool for manipulation, counting, indexing, and set operations on DNA sequences represented as kmer sets. `obikmer` is a Rust tool for manipulation, counting, indexing, and set operations on DNA sequences represented as kmer sets.
## Subcommands
| Subcommand | Purpose |
|-------------|---------|
| `superkmer` | Extract super-kmers from a sequence file and write to stdout |
| `index` | Build a complete genome index (scatter → dereplicate → count → layered MPHF) |
| `merge` | Merge multiple built indexes into one |
| `rebuild` | Filter and compact an existing index into a new single-layer index |
| `query` | Query an index with sequences and annotate matches |
| `dump` | Dump all indexed kmers as CSV (kmer + per-genome counts or presence) |
| `annotate` | Add or update genome metadata from a CSV file; or dump metadata as CSV |
| `distance` | Compute pairwise distance matrix between genomes; optionally build NJ/UPGMA trees |
| `unitig` | Dump unitigs from a built index to stdout (debug) |
| `estimate` | Estimate approximate-index parameters (z, evidence bits, FP rates) before indexing |
| `reindex` | Convert an index's evidence in-place: exact ↔ approx |
## Constraints ## Constraints
- Target scale: individual genome datasets, tens of Gbases - Target scale: individual genome datasets, tens of Gbases
- Maximum efficiency in computation, memory, and disk usage - Maximum efficiency in computation, memory, and disk usage
- k odd, k ∈ [11, 31], fixed at runtime; kmer fits in a u64 (2 bits/base)
- Canonical form: `min(kmer, revcomp(kmer))` reduces strand-symmetric space by half
- Input formats: FASTA, FASTQ, gzip, streaming stdin - Input formats: FASTA, FASTQ, gzip, streaming stdin
## Priority operations ## Priority operations
+1 -1
View File
@@ -9,7 +9,7 @@ A **kmer** is a DNA subsequence of fixed length k. Two constraints govern the ch
## Super-kmers ## Super-kmers
A **super-kmer** is a maximal run of consecutive kmers from a DNA read, each overlapping the next by k1 nucleotides. Each kmer of the run carries the same **canonical minimizer**. The **canonical minimizer** of a kmer is the smallest value of `min(m-mer, revcomp(m-mer))` over all m-mers within the kmer (m < k, m odd), with the constraint that **non-degenerate m-mers are always preferred** over degenerate ones. A degenerate m-mer is one composed of a single repeated nucleotide (all-A, all-C, all-G, or all-T); such m-mers are selected only if no non-degenerate candidate exists in the window. A **super-kmer** is a maximal run of consecutive kmers from a DNA read, each overlapping the next by k1 nucleotides, sharing the same **canonical minimizer**. The **canonical minimizer** of a kmer is the m-mer (m < k) whose canonical hash `hash_kmer(min(m-mer, revcomp(m-mer)))` is smallest over all m-mers in the kmer window. The hash function is a `mix64`-based bijection; selection is purely hash-ordered with no degeneracy filter. A super-kmer is capped at 256 nucleotides; a longer run is split at that boundary.
### Canonical super-kmers ### Canonical super-kmers
+10 -6
View File
@@ -17,18 +17,22 @@ The Watson-Crick complement of any base is its bitwise NOT on 2 bits: `complemen
A kmer fits in a single `u64`. Nucleotide 0 occupies bits 6362, nucleotide i occupies bits 632i and 622i, and the low 642k bits are zero. Extraction of nucleotide i (0 ≤ i < k): `(kmer >> (62 - 2*i)) & 0b11`. A kmer fits in a single `u64`. Nucleotide 0 occupies bits 6362, nucleotide i occupies bits 632i and 622i, and the low 642k bits are zero. Extraction of nucleotide i (0 ≤ i < k): `(kmer >> (62 - 2*i)) & 0b11`.
Reverse complement is computed via a **16-bit lookup table** (65 536 entries × 2 bytes = 128 KB, fits in L2 cache) storing the reverse-complement of every 8-base chunk. Reverse complement is computed by **bit manipulation in four steps**, with no lookup table:
!!! abstract "Algorithm — Kmer reverse complement" !!! abstract "Algorithm — Kmer reverse complement"
```text ```text
procedure KmerRevcomp(kmer, k): procedure KmerRevcomp(kmer, k):
raw ← TABLE16[kmer & 0xFFFF] << 48 x ← ~kmer -- complement all bases
| TABLE16[(kmer >> 16) & 0xFFFF] << 32 x ← swap_bytes(x) -- reverse byte order
| TABLE16[(kmer >> 32) & 0xFFFF] << 16 x ← ((x >> 4) & 0x0F0F0F0F0F0F0F0F)
| TABLE16[(kmer >> 48) & 0xFFFF] | ((x & 0x0F0F0F0F0F0F0F0F) << 4) -- swap nibbles within each byte
return raw << (64 - 2*k) x ← ((x >> 2) & 0x3333333333333333)
| ((x & 0x3333333333333333) << 2) -- swap 2-bit pairs within each nibble
return x << (64 - 2*k) -- re-align to MSB
``` ```
The three reorder passes together reverse the order of all 2-bit base codes across the 64-bit word. The bitwise NOT in the first step complements each base (A↔T, C↔G). The final left shift clears the low 642k padding bits.
The **canonical form** is the lexicographic minimum of the kmer and its reverse complement: The **canonical form** is the lexicographic minimum of the kmer and its reverse complement:
``` ```
+1 -1
View File
@@ -40,7 +40,7 @@ The filter computes $\hat{H}(ws)$ for each word size ws from 1 to ws_max and ret
$$\text{entropy}(kmer) = \min_{ws=1}^{ws_{\max}} \hat{H}(ws)$$ $$\text{entropy}(kmer) = \min_{ws=1}^{ws_{\max}} \hat{H}(ws)$$
A value near 0 indicates low complexity (e.g. AAAA…); near 1 indicates high complexity. A kmer is rejected if $\text{entropy}(kmer) \leq \theta$, where $\theta$ is a collection parameter. The minimum across word sizes ensures that any scale of repetition is detected independently: polyA is caught at ws=1, dinucleotide repeats at ws=2, etc. A value near 0 indicates low complexity (e.g. AAAA…); near 1 indicates high complexity. A kmer is rejected if $\text{entropy}(kmer) < \theta$, where $\theta$ is a collection parameter (default 0.7). The minimum across word sizes ensures that any scale of repetition is detected independently: polyA is caught at ws=1, dinucleotide repeats at ws=2, etc.
## Interpretation as an effective number of classes ## Interpretation as an effective number of classes
+2 -2
View File
@@ -44,7 +44,7 @@ impl KmerPartition {
if !layer_dir.exists() { break; } if !layer_dir.exists() { break; }
l += 1; l += 1;
let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?; let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?;
let reader = UnitigFileReader::open(&layer_dir.join("unitigs.bin"))?; let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?;
let counts_dir = layer_dir.join("counts"); let counts_dir = layer_dir.join("counts");
let presence_dir = layer_dir.join("presence"); let presence_dir = layer_dir.join("presence");
@@ -97,7 +97,7 @@ impl KmerPartition {
let layer_dir = index_dir.join(format!("layer_{layer}")); let layer_dir = index_dir.join(format!("layer_{layer}"));
if !layer_dir.exists() { break; } if !layer_dir.exists() { break; }
let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?; let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?;
let reader = UnitigFileReader::open(&layer_dir.join("unitigs.bin"))?; let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?;
let counts_dir = layer_dir.join("counts"); let counts_dir = layer_dir.join("counts");
let presence_dir = layer_dir.join("presence"); let presence_dir = layer_dir.join("presence");
+2 -8
View File
@@ -110,7 +110,7 @@ impl KmerPartition {
uw.close()?; uw.close()?;
if with_counts { if with_counts {
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, block_bits, |kmer| { Layer::<PersistentCompactIntMatrix>::build(&layer_dir, block_bits, evidence, |kmer| {
match (&mphf1_opt, &counts1_opt) { match (&mphf1_opt, &counts1_opt) {
(Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())), (Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())),
_ => 1, _ => 1,
@@ -118,13 +118,7 @@ impl KmerPartition {
}) })
.map_err(olm_to_sk)?; .map_err(olm_to_sk)?;
} else { } else {
Layer::<()>::build(&layer_dir, block_bits).map_err(olm_to_sk)?; Layer::<()>::build(&layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
}
// For approximate evidence: replace the exact evidence bundle with a
// fingerprint. For exact evidence, build() already wrote it.
if let EvidenceKind::Approx { b, z } = evidence {
Layer::<()>::build_approx_evidence(&layer_dir, *b, *z).map_err(olm_to_sk)?;
} }
// Write meta.json in the index/ directory so LayeredMap::open works // Write meta.json in the index/ directory so LayeredMap::open works
+4 -4
View File
@@ -9,7 +9,7 @@ use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder,
PersistentCompactIntVecBuilder}; PersistentCompactIntVecBuilder};
use obikseq::CanonicalKmer; use obikseq::CanonicalKmer;
use obiskio::{SKError, SKResult, UnitigFileReader}; use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::{Layer, LayeredMap, MphfLayer, OLMError}; use obilayeredmap::{EvidenceKind, Layer, LayeredMap, MphfLayer, OLMError};
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use crate::partition::KmerPartition; use crate::partition::KmerPartition;
@@ -195,7 +195,7 @@ impl KmerPartition {
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let unitigs_path = src_index_dir let unitigs_path = src_index_dir
.join(format!("layer_{l}")).join("unitigs.bin"); .join(format!("layer_{l}")).join("unitigs.bin");
let reader = UnitigFileReader::open(&unitigs_path)?; let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if dst_map.query(kmer).is_none() { if dst_map.query(kmer).is_none() {
g.push(kmer); g.push(kmer);
@@ -217,7 +217,7 @@ impl KmerPartition {
uw.write(&unitig)?; uw.write(&unitig)?;
} }
uw.close()?; uw.close()?;
Layer::<()>::build(&new_layer_dir, block_bits).map_err(olm_to_sk)?; Layer::<()>::build(&new_layer_dir, block_bits, &EvidenceKind::Exact).map_err(olm_to_sk)?;
} }
drop(g); drop(g);
@@ -303,7 +303,7 @@ impl KmerPartition {
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
let reader = UnitigFileReader::open(&src_layer_dir.join("unitigs.bin"))?; let reader = UnitigFileReader::open_sequential(&src_layer_dir.join("unitigs.bin"))?;
let src_data = SrcLayerData::open(&src_layer_dir, mode)?; let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
+4 -4
View File
@@ -8,7 +8,7 @@ use obicompactvec::{PersistentBitMatrixBuilder,
PersistentCompactIntVecBuilder}; PersistentCompactIntVecBuilder};
use obidebruinj::GraphDeBruijn; use obidebruinj::GraphDeBruijn;
use obiskio::{SKError, SKResult, UnitigFileReader}; use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::{Layer, MphfLayer, OLMError}; use obilayeredmap::{EvidenceKind, Layer, MphfLayer, OLMError};
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use crate::filter::KmerFilter; use crate::filter::KmerFilter;
@@ -116,7 +116,7 @@ impl KmerPartition {
let unitigs_path = src_layer_dir.join("unitigs.bin"); let unitigs_path = src_layer_dir.join("unitigs.bin");
if !unitigs_path.exists() { continue; } if !unitigs_path.exists() { continue; }
let reader = UnitigFileReader::open(&unitigs_path)?; let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
let src_data = SrcLayerData::open(&src_layer_dir, mode)?; let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
@@ -146,7 +146,7 @@ impl KmerPartition {
uw.close()?; uw.close()?;
drop(g); drop(g);
Layer::<()>::build(&dst_layer_dir, block_bits).map_err(olm_to_sk)?; Layer::<()>::build(&dst_layer_dir, block_bits, &EvidenceKind::Exact).map_err(olm_to_sk)?;
let dst_mphf = MphfLayer::open(&dst_layer_dir).map_err(olm_to_sk)?; let dst_mphf = MphfLayer::open(&dst_layer_dir).map_err(olm_to_sk)?;
// ── Prepare matrix builders (one column per genome) ─────────────────── // ── Prepare matrix builders (one column per genome) ───────────────────
@@ -181,7 +181,7 @@ impl KmerPartition {
let unitigs_path = src_layer_dir.join("unitigs.bin"); let unitigs_path = src_layer_dir.join("unitigs.bin");
if !unitigs_path.exists() { continue; } if !unitigs_path.exists() { continue; }
let reader = UnitigFileReader::open(&unitigs_path)?; let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
let src_data = SrcLayerData::open(&src_layer_dir, mode)?; let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
+8 -5
View File
@@ -102,8 +102,8 @@ impl<D: LayerData> Layer<D> {
// ── Mode 1 — set membership ─────────────────────────────────────────────────── // ── Mode 1 — set membership ───────────────────────────────────────────────────
impl Layer<()> { impl Layer<()> {
pub fn build(out_dir: &Path, block_bits: u8) -> OLMResult<usize> { pub fn build(out_dir: &Path, block_bits: u8, evidence_kind: &EvidenceKind) -> OLMResult<usize> {
MphfLayer::build(out_dir, block_bits, &mut |_, _| Ok(())) MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |_, _| Ok(()))
} }
/// Create a presence matrix for a set-membership layer (first merge). /// Create a presence matrix for a set-membership layer (first merge).
@@ -126,6 +126,7 @@ impl Layer<PersistentCompactIntMatrix> {
pub fn build( pub fn build(
out_dir: &Path, out_dir: &Path,
block_bits: u8, block_bits: u8,
evidence_kind: &EvidenceKind,
count_of: impl Fn(CanonicalKmer) -> u32, count_of: impl Fn(CanonicalKmer) -> u32,
) -> OLMResult<usize> { ) -> OLMResult<usize> {
let n = UnitigFileReader::open_sequential(&out_dir.join(UNITIGS_FILE))?.n_kmers(); let n = UnitigFileReader::open_sequential(&out_dir.join(UNITIGS_FILE))?.n_kmers();
@@ -133,7 +134,7 @@ impl Layer<PersistentCompactIntMatrix> {
let mut mb = PersistentCompactIntMatrixBuilder::new(n, &counts_dir) let mut mb = PersistentCompactIntMatrixBuilder::new(n, &counts_dir)
.map_err(OLMError::Io)?; .map_err(OLMError::Io)?;
let mut col = mb.add_col().map_err(OLMError::Io)?; let mut col = mb.add_col().map_err(OLMError::Io)?;
let n_built = MphfLayer::build(out_dir, block_bits, &mut |slot, kmer| { let n_built = MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |slot, kmer| {
col.set(slot, count_of(kmer)); col.set(slot, count_of(kmer));
Ok(()) Ok(())
})?; })?;
@@ -145,9 +146,10 @@ impl Layer<PersistentCompactIntMatrix> {
pub fn build_from_map( pub fn build_from_map(
out_dir: &Path, out_dir: &Path,
block_bits: u8, block_bits: u8,
evidence_kind: &EvidenceKind,
counts: &HashMap<CanonicalKmer, u32>, counts: &HashMap<CanonicalKmer, u32>,
) -> OLMResult<usize> { ) -> OLMResult<usize> {
Self::build(out_dir, block_bits, |kmer| counts.get(&kmer).copied().unwrap_or(0)) Self::build(out_dir, block_bits, evidence_kind, |kmer| counts.get(&kmer).copied().unwrap_or(0))
} }
} }
@@ -177,6 +179,7 @@ impl Layer<PersistentBitMatrix> {
pub fn build_presence( pub fn build_presence(
out_dir: &Path, out_dir: &Path,
block_bits: u8, block_bits: u8,
evidence_kind: &EvidenceKind,
n_genomes: usize, n_genomes: usize,
present_in: impl Fn(CanonicalKmer, usize) -> bool, present_in: impl Fn(CanonicalKmer, usize) -> bool,
) -> OLMResult<usize> { ) -> OLMResult<usize> {
@@ -186,7 +189,7 @@ impl Layer<PersistentBitMatrix> {
let mut cols: Vec<_> = (0..n_genomes) let mut cols: Vec<_> = (0..n_genomes)
.map(|_| mb.add_col().map_err(OLMError::Io)) .map(|_| mb.add_col().map_err(OLMError::Io))
.collect::<OLMResult<_>>()?; .collect::<OLMResult<_>>()?;
let n_built = MphfLayer::build(out_dir, block_bits, &mut |slot, kmer| { let n_built = MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |slot, kmer| {
for (g, col) in cols.iter_mut().enumerate() { for (g, col) in cols.iter_mut().enumerate() {
col.set(slot, present_in(kmer, g)); col.set(slot, present_in(kmer, g));
} }
+3 -2
View File
@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
use obicompactvec::PersistentCompactIntMatrix; use obicompactvec::PersistentCompactIntMatrix;
use obikseq::CanonicalKmer; use obikseq::CanonicalKmer;
use obiskio::{UnitigFileWriter, DEFAULT_BLOCK_BITS}; use obiskio::{UnitigFileWriter, DEFAULT_BLOCK_BITS};
use crate::meta::EvidenceKind;
use crate::error::OLMResult; use crate::error::OLMResult;
use crate::layer::{Hit, Layer, LayerData}; use crate::layer::{Hit, Layer, LayerData};
@@ -90,7 +91,7 @@ impl LayeredMap<()> {
pub fn push_layer(&mut self) -> OLMResult<usize> { pub fn push_layer(&mut self) -> OLMResult<usize> {
let i = self.layers.len(); let i = self.layers.len();
let dir = layer_dir(&self.root, i); let dir = layer_dir(&self.root, i);
Layer::<()>::build(&dir, DEFAULT_BLOCK_BITS)?; Layer::<()>::build(&dir, DEFAULT_BLOCK_BITS, &EvidenceKind::Exact)?;
self.append_layer()?; self.append_layer()?;
Ok(i) Ok(i)
} }
@@ -102,7 +103,7 @@ impl LayeredMap<PersistentCompactIntMatrix> {
pub fn push_layer(&mut self, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> { pub fn push_layer(&mut self, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
let i = self.layers.len(); let i = self.layers.len();
let dir = layer_dir(&self.root, i); let dir = layer_dir(&self.root, i);
Layer::<PersistentCompactIntMatrix>::build(&dir, DEFAULT_BLOCK_BITS, count_of)?; Layer::<PersistentCompactIntMatrix>::build(&dir, DEFAULT_BLOCK_BITS, &EvidenceKind::Exact, count_of)?;
self.append_layer()?; self.append_layer()?;
Ok(i) Ok(i)
} }
+85 -15
View File
@@ -4,7 +4,7 @@ use std::path::Path;
use cacheline_ef::{CachelineEf, CachelineEfVec}; use cacheline_ef::{CachelineEf, CachelineEfVec};
use epserde::prelude::*; use epserde::prelude::*;
use obikseq::CanonicalKmer; use obikseq::CanonicalKmer;
use obiskio::{UnitigFileReader, UnitigFileWriter, build_unitig_idx}; use obiskio::{CanonicalKmerIter, UnitigFileReader, UnitigFileWriter, build_unitig_idx};
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64}; use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
use crate::error::{OLMError, OLMResult}; use crate::error::{OLMError, OLMResult};
@@ -198,22 +198,35 @@ impl MphfLayer {
Ok(n) Ok(n)
} }
/// Build MPHF and exact evidence from the unitigs file already present in /// Build MPHF then evidence from the unitigs file already present in `dir`.
/// `dir`. Calls `fill_slot(slot, kmer)` once per kmer for DataStore ///
/// population. Returns the number of kmers indexed. /// - Exact: `.idx` is built for pass-1 parallel construction and kept for
/// query-time kmer verification. `evidence.bin` is written.
/// - Approx: pass-1 uses `open_sequential` + `par_bridge` — no `.idx` is
/// ever created. `fingerprint.bin` is written.
///
/// `fill_slot(slot, kmer)` is called once per kmer in both modes.
pub(crate) fn build( pub(crate) fn build(
dir: &Path, dir: &Path,
block_bits: u8, block_bits: u8,
evidence_kind: &EvidenceKind,
fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>, fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>,
) -> OLMResult<usize> { ) -> OLMResult<usize> {
use rayon::prelude::*; use rayon::prelude::*;
let unitig_path = dir.join(UNITIGS_FILE); let unitig_path = dir.join(UNITIGS_FILE);
build_unitig_idx(&unitig_path, block_bits)?; match evidence_kind {
// ── Exact path ────────────────────────────────────────────────────
let unitigs = UnitigFileReader::open(&unitig_path)?; // .idx is built LAST, once evidence.bin is written, so it is never
let n = unitigs.n_kmers(); // present during construction — only at query time.
EvidenceKind::Exact => {
let n = UnitigFileReader::open_sequential(&unitig_path)?.n_kmers();
let keys = CanonicalKmerIter::new(&unitig_path)
.map_err(|e| match e {
obiskio::SKError::Io(io) => OLMError::Io(io),
e => OLMError::InvalidLayer(e.to_string()),
})?;
if n == 0 { if n == 0 {
fs::File::create(dir.join(EVIDENCE_FILE))?; fs::File::create(dir.join(EVIDENCE_FILE))?;
@@ -223,20 +236,18 @@ impl MphfLayer {
mphf.store(&dir.join(MPHF_FILE)) mphf.store(&dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?; .map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
LayerMeta::exact().save(dir)?; LayerMeta::exact().save(dir)?;
build_unitig_idx(&unitig_path, block_bits)?;
return Ok(0); return Ok(0);
} }
// Pass 1 — build MPHF (parallel, random access via .idx) // Pass 1 — MPHF construction via clonable mmap iterator
let keys = (0..unitigs.len())
.into_par_iter()
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
let mphf: Mphf = let mphf: Mphf =
Mphf::new_from_par_iter(n, keys, PtrHashParams::<CubicEps>::default()); Mphf::new_from_par_iter(n, keys.map(|k| k.raw()).par_bridge(), PtrHashParams::<CubicEps>::default());
mphf.store(&dir.join(MPHF_FILE)) mphf.store(&dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?; .map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
// Pass 2 — fill evidence + mode-specific data via callback // Pass 2 — sequential: fill evidence.bin + callback
let unitigs2 = UnitigFileReader::open(&unitig_path)?; let unitigs2 = UnitigFileReader::open_sequential(&unitig_path)?;
let mut ev = EvidenceWriter::new(n); let mut ev = EvidenceWriter::new(n);
let mut seen = vec![0u8; (n + 7) / 8]; let mut seen = vec![0u8; (n + 7) / 8];
@@ -257,6 +268,65 @@ impl MphfLayer {
ev.write(&dir.join(EVIDENCE_FILE))?; ev.write(&dir.join(EVIDENCE_FILE))?;
LayerMeta::exact().save(dir)?; LayerMeta::exact().save(dir)?;
// .idx built last: strictly for query-time kmer verification
build_unitig_idx(&unitig_path, block_bits)?;
Ok(n) Ok(n)
} }
// ── Approx path ───────────────────────────────────────────────────
// No .idx is created at any point.
EvidenceKind::Approx { b, z } => {
let unitigs = UnitigFileReader::open_sequential(&unitig_path)?;
let n = unitigs.n_kmers();
if n == 0 {
FingerprintVecWriter::new(0, *b).write(&dir.join(FINGERPRINT_FILE))?;
let mphf: Mphf =
Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
mphf.store(&dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
LayerMeta::approx(*b, *z).save(dir)?;
return Ok(0);
}
// Pass 1 — MPHF construction via mmap-backed clonable iterator.
// No .idx is created. par_bridge() parallelises the sequential scan;
// Clone on CanonicalKmerRawIter shares the Arc<Mmap> and resets to pos 0.
let keys = CanonicalKmerIter::new(&unitig_path)
.map_err(|e| match e {
obiskio::SKError::Io(io) => OLMError::Io(io),
e => OLMError::InvalidLayer(e.to_string()),
})?;
let mphf: Mphf =
Mphf::new_from_par_iter(n, keys.map(|k| k.raw()).par_bridge(), PtrHashParams::<CubicEps>::default());
mphf.store(&dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
// Pass 2 — sequential: fill fingerprint.bin + callback
let unitigs2 = UnitigFileReader::open_sequential(&unitig_path)?;
let mut fw = FingerprintVecWriter::new(n, *b);
let mut seen = vec![0u8; (n + 7) / 8];
for kmer in unitigs2.iter_canonical_kmers() {
let slot = mphf.index(&kmer.raw());
if slot >= n {
return Err(OLMError::Mphf("slot out of bounds".into()));
}
let byte = slot / 8;
let bit = 1u8 << (slot % 8);
if seen[byte] & bit != 0 {
return Err(OLMError::Mphf("duplicate slot".into()));
}
seen[byte] |= bit;
fw.set(slot, kmer.seq_hash());
fill_slot(slot, kmer)?;
}
fw.write(&dir.join(FINGERPRINT_FILE))?;
LayerMeta::approx(*b, *z).save(dir)?;
Ok(n)
}
}
}
} }
+5 -3
View File
@@ -2,6 +2,7 @@ use super::*;
use obicompactvec::PersistentCompactIntMatrix; use obicompactvec::PersistentCompactIntMatrix;
use obikseq::{set_k, Kmer, Sequence as _, Unitig}; use obikseq::{set_k, Kmer, Sequence as _, Unitig};
use obiskio::DEFAULT_BLOCK_BITS; use obiskio::DEFAULT_BLOCK_BITS;
use crate::meta::EvidenceKind;
use tempfile::tempdir; use tempfile::tempdir;
fn write_unitigs(dir: &Path, seqs: &[&[u8]]) { fn write_unitigs(dir: &Path, seqs: &[&[u8]]) {
@@ -24,7 +25,7 @@ fn build_and_query_all_kmers_found() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
write_unitigs(dir.path(), &[b"AAAACGT"]); write_unitigs(dir.path(), &[b"AAAACGT"]);
let kmers = all_canonical_kmers(dir.path()); let kmers = all_canonical_kmers(dir.path());
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS).unwrap(); Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact).unwrap();
let layer = Layer::<()>::open(dir.path()).unwrap(); let layer = Layer::<()>::open(dir.path()).unwrap();
for kmer in kmers { for kmer in kmers {
assert!(layer.query(kmer).is_some(), "kmer should be present"); assert!(layer.query(kmer).is_some(), "kmer should be present");
@@ -42,6 +43,7 @@ fn counts_are_stored_and_retrieved() {
Layer::<PersistentCompactIntMatrix>::build( Layer::<PersistentCompactIntMatrix>::build(
dir.path(), dir.path(),
DEFAULT_BLOCK_BITS, DEFAULT_BLOCK_BITS,
&EvidenceKind::Exact,
|kmer| count_map.get(&kmer).copied().unwrap_or(0), |kmer| count_map.get(&kmer).copied().unwrap_or(0),
).unwrap(); ).unwrap();
let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap(); let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap();
@@ -56,7 +58,7 @@ fn query_absent_returns_none() {
set_k(4); set_k(4);
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
write_unitigs(dir.path(), &[b"AAAACGT"]); write_unitigs(dir.path(), &[b"AAAACGT"]);
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS).unwrap(); Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact).unwrap();
let layer = Layer::<()>::open(dir.path()).unwrap(); let layer = Layer::<()>::open(dir.path()).unwrap();
let absent = Kmer::from_ascii(b"CCCC").unwrap().canonical(); let absent = Kmer::from_ascii(b"CCCC").unwrap().canonical();
assert!(layer.query(absent).is_none()); assert!(layer.query(absent).is_none());
@@ -67,7 +69,7 @@ fn open_after_build_is_consistent() {
set_k(4); set_k(4);
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
write_unitigs(dir.path(), &[b"AAAACGT"]); write_unitigs(dir.path(), &[b"AAAACGT"]);
let n = Layer::<PersistentCompactIntMatrix>::build(dir.path(), DEFAULT_BLOCK_BITS, |_| 7).unwrap(); let n = Layer::<PersistentCompactIntMatrix>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact, |_| 7).unwrap();
assert_eq!(n, 4); assert_eq!(n, 4);
let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap(); let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap();
let kmer = Kmer::from_ascii(b"AAAA").unwrap().canonical(); let kmer = Kmer::from_ascii(b"AAAA").unwrap().canonical();
+2 -1
View File
@@ -8,7 +8,8 @@ pub use error::{SKError, SKResult};
pub use meta::SKFileMeta; pub use meta::SKFileMeta;
pub use pool::{SKFilePool, SKFileWriter, SharedPool, create_token, create_token_with}; pub use pool::{SKFilePool, SKFileWriter, SharedPool, create_token, create_token_with};
pub use reader::{SKFileIter, SKFileReader}; pub use reader::{SKFileIter, SKFileReader};
pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS}; pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS,
CanonicalKmerIter};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
+80
View File
@@ -1,6 +1,7 @@
use std::fs::File; use std::fs::File;
use std::io::{BufWriter, Write as _}; use std::io::{BufWriter, Write as _};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc;
use memmap2::Mmap; use memmap2::Mmap;
use obikseq::{CanonicalKmer, Kmer, Unitig}; use obikseq::{CanonicalKmer, Kmer, Unitig};
@@ -439,6 +440,85 @@ fn extract_kmer_raw(bytes: &[u8], j: usize, k: usize) -> u64 {
raw << (64 - 2 * k) raw << (64 - 2 * k)
} }
// ── CanonicalKmerRawIter ──────────────────────────────────────────────────────
// ── CanonicalKmerIter ─────────────────────────────────────────────────────────
/// Sequential iterator over [`CanonicalKmer`] from a `unitigs.bin` file.
///
/// Holds an `Arc<Mmap>` so that `Clone` is O(1): both copies share the same
/// memory-mapped pages. Cloning resets the cursor to position 0 — this lets
/// ptr_hash's `new_from_par_iter` (which requires a `Clone`-able parallel
/// iterator via `par_bridge()`) make multiple passes without ever creating
/// a `.idx` file.
pub struct CanonicalKmerIter {
mmap: Arc<Mmap>,
k: usize,
chunk_pos: usize, // byte offset of the current chunk header
data_pos: usize, // byte offset of the current chunk's sequence bytes
n_kmers: usize, // kmers in current chunk
kmer_idx: usize, // next kmer index to yield within the current chunk
}
impl CanonicalKmerIter {
pub fn new(path: &Path) -> SKResult<Self> {
let file = File::open(path).map_err(SKError::Io)?;
let mmap = Arc::new(unsafe { Mmap::map(&file).map_err(SKError::Io)? });
let k = obikseq::params::k();
let mut s = Self { mmap, k, chunk_pos: 0, data_pos: 0, n_kmers: 0, kmer_idx: 0 };
s.load_chunk();
Ok(s)
}
#[inline]
fn load_chunk(&mut self) {
if self.chunk_pos < self.mmap.len() {
let seql_minus_k = self.mmap[self.chunk_pos] as usize;
self.n_kmers = seql_minus_k + 1;
self.data_pos = self.chunk_pos + 1;
self.kmer_idx = 0;
}
}
}
impl Clone for CanonicalKmerIter {
fn clone(&self) -> Self {
let mut c = Self {
mmap: Arc::clone(&self.mmap),
k: self.k,
chunk_pos: 0,
data_pos: 0,
n_kmers: 0,
kmer_idx: 0,
};
c.load_chunk();
c
}
}
impl Iterator for CanonicalKmerIter {
type Item = CanonicalKmer;
#[inline]
fn next(&mut self) -> Option<CanonicalKmer> {
loop {
if self.chunk_pos >= self.mmap.len() {
return None;
}
if self.kmer_idx < self.n_kmers {
let raw = extract_kmer_raw(&self.mmap[self.data_pos..], self.kmer_idx, self.k);
let canon = canonical_raw(raw, self.k);
self.kmer_idx += 1;
return Some(CanonicalKmer::from_raw_unchecked(canon));
}
let seql_minus_k = self.mmap[self.chunk_pos] as usize;
let byte_len = (seql_minus_k + self.k + 3) / 4;
self.chunk_pos += 1 + byte_len;
self.load_chunk();
}
}
}
#[cfg(test)] #[cfg(test)]
#[path = "tests/unitig_index.rs"] #[path = "tests/unitig_index.rs"]
mod tests; mod tests;