Files
obikmer/docmd/implementation/pipeline.md
T

162 lines
9.1 KiB
Markdown
Raw Normal View History

2026-04-16 22:38:20 +02:00
# Construction pipeline
All phases after scatter are embarrassingly parallel across partitions.
## Phase 0 — Parameter estimation
The construction parameters p, n, and min_count depend on the kmer frequency spectrum of the dataset. Estimating this spectrum before construction avoids costly re-partitioning if p is badly chosen.
Two approaches are supported:
- **External estimation (preferred):** run [NT-CARD](https://github.com/bcgsc/ntCard) on the input files and pass its histogram output to `obikmer build`. NT-CARD produces a kmer frequency histogram in a single streaming pass using ntHash and a Flajolet-Martin-style estimator; obikmer reads this file and derives p, n, and min_count automatically.
- **Internal estimation (future):** an `obikmer estimate` subcommand for users who prefer a single-tool workflow. The implementation would combine two components: (1) **ntHash**, a rolling hash that updates the kmer hash in O(1) per nucleotide by incrementally adding the incoming base and removing the outgoing one — Rust crates exist; (2) a **Flajolet-Martin-style streaming estimator** that maintains a small table of minimum hash values and infers the frequency histogram from their statistical distribution, as described in the NT-CARD paper [@Mohamadi2017-ok].
The histogram gives:
- **F0** (number of distinct kmers) → sets p (target ~10M kmers/partition → p = ⌈log₂(F0 / 10M)⌉)
- **frequency distribution** → sets n (choose n so that fewer than 1% of kmers overflow)
- **error valley** → suggests min_count (typically the local minimum between the error peak and the coverage peak)
## Phase 1 — Scatter
Single streaming pass over raw input files (FASTA/FASTQ, gzip). FASTQ quality scores are ignored. For each read:
1. **Ambiguous base filter**: cut at any non-ACGT base; discard fragments shorter than k.
2. **Entropy filter**: scan each fragment with a sliding window of size k. When the kmer $K_i = S[i \mathinner{..} i+k-1]$ ended by nucleotide $S[j]$ (with $j = i+k-1$) has entropy below threshold $\theta$, emit the current segment and start a new one (see algorithm below). $K_i$ belongs to neither segment, and no valid kmer is lost.
3. **Length filter**: discard any segment shorter than k produced by step 2.
4. **Super-kmer extraction**: for each clean segment, slide a minimizer window and group consecutive kmers sharing the same canonical minimizer; canonise each super-kmer by lexicographic comparison with its reverse complement (early exit).
5. **Partition routing**: `hash(canonical_minimizer) → PART` → append super-kmer to `partition/superkmers.bin.gz`.
**Segmentation behavior:**
When $K_i$ (ended by $S[j]$, $j = i+k-1$) fails the entropy threshold:
- Current segment $S[\textit{seg_start} \mathinner{..} j-1]$ is emitted (last valid kmer = $K_{i-1}$)
- New segment starts at $S[i+1]$ (first new kmer = $K_{i+1}$)
- $K_i$ is excluded: current segment lacks $S[j]$, new segment lacks $S[i]$
- Overlap = $S[i+1 \mathinner{..} j-1]$ = $k-2$ nucleotides
!!! abstract "Algorithm — Entropy filter: sliding window segmentation"
```text
procedure EntropyFilter(S, N, k, θ):
seg_start ← 0
window ← []
for j ← 0 to N1:
window.push(S[j])
if |window| < k: continue
i ← j k + 1
if entropy(window) ≤ θ:
emit S[seg_start .. j1]
seg_start ← i + 1
window ← S[i+1 .. j]
else:
window.pop_front()
emit S[seg_start .. N1]
```
Writes are sequential and append-only — IO-friendly. Gzip applied at write time. Data volume ≈ raw genome size (2 bits/nt compaction offsets header overhead).
## Phase 2 — Dereplication
Performed independently per partition. Identical super-kmers are consolidated and their COUNT accumulated — analogous to amplicon dereplication in metabarcoding. Uses external bucket sort to stay within RAM bounds:
**Pass 1** (streaming): hash the nucleotide payload of each super-kmer, route to one of B bucket files:
```
hash(sequence) % B → bucket_i.bin
```
B ≈ 100 is tunable; RAM needed ≈ partition_size / B.
**Pass 2**: for each bucket, load into an in-memory `HashMap<sequence, COUNT>`, dereplicate by summing COUNT values, write consolidated super-kmers.
After dereplication: at Nx coverage the partition shrinks by ~x (errors aside). The COUNT field in each super-kmer header = number of times that exact super-kmer sequence was observed across all input reads.
**Important:** super-kmer COUNT ≠ individual kmer count. A kmer can appear in multiple distinct super-kmers (same partition, different flanking context); its true count = sum of COUNT of all super-kmers containing it. A super-kmer with COUNT=1 may contain only high-abundance kmers, each appearing in many other super-kmers. Abundance filtering therefore cannot be applied at this phase.
## Phase 3 — Per-kmer count aggregation and quorum filtering
For each dereplicated super-kmer, enumerate its kmers and accumulate counts:
```
for each super-kmer (sequence, COUNT):
for each kmer in sequence:
kmer_counts[canonical(kmer)] += COUNT
```
Implemented as an external sort or a temporary HashMap, depending on partition size. At the end of this phase, each distinct canonical kmer has its exact total count.
Abundance filter applied here: kmers with `total_count < q` are discarded. `q` is a collection parameter (0 = keep all, including singletons for ≤1x data).
No pre-filter on super-kmer COUNT is possible at phase 2: a super-kmer with COUNT=1 may contain only high-abundance kmers, each present in many other super-kmers across the partition.
## Phase 4 — Super-kmer compaction
The valid kmer set from phase 3 is used as a mask to rewrite the super-kmer files:
```
for each dereplicated super-kmer:
scan kmer by kmer
kmer not in valid set → break point (terminates current super-kmer)
kmer in valid set → extend current super-kmer
```
Three cases per super-kmer:
- **All kmers valid** → copied as-is
- **No kmer valid** → discarded
- **Mixed** → split into sub-super-kmers at invalid boundaries; each sub-super-kmer inherits the original COUNT
After splitting, re-apply dereplication (bucket sort, phase 2 method) — splitting can produce new identical super-kmers. This re-dereplication is cheap: the volume is already greatly reduced.
Output: a clean super-kmer file where every kmer passes quorum. This file feeds phase 5.
## Phase 5 — Local de Bruijn graph and unitig construction
Within each partition, build a **local de Bruijn graph** from the valid kmer set and compute its unitigs. All operations are local to the partition — no cross-partition communication.
```
valid kmers → HashSet<u64>
for each kmer K:
out_degree = |{K[1:]+b | b ∈ {A,C,G,T}} ∩ HashSet|
in_degree = |{b+K[:-1] | b ∈ {A,C,G,T}} ∩ HashSet|
internal node ↔ in_degree=1 AND out_degree=1
branching / dead-end → unitig start or end
```
Traverse non-branching paths to assemble unitigs. Kmers whose neighbours fall in other partitions appear as dead ends locally — they terminate the unitig. The result: **each kmer appears in exactly one unitig** within the partition.
The partition size (controlled by p) must be calibrated so that the HashSet fits in RAM during this phase.
Output: `unitigs.bin` — the permanent evidence structure for the partition. Each kmer in the partition appears at exactly one (unitig_id, offset) location.
**Scope of local unitigs:** these are unitigs of the partition's local de Bruijn graph, not global unitigs. A kmer whose k-1 successor or predecessor falls in another partition appears as a dead end locally and terminates the unitig. This does not affect correctness of verification but means partition-local unitigs cannot be directly reused for global assembly.
## Phase 6 — MPHF construction and index finalisation
Built once on the definitive kmer set (all kmers in all unitigs of the partition). See [obilayeredmap](obilayeredmap.md) and [MPHF selection](mphf.md) for the current implementation.
2026-04-16 22:38:20 +02:00
```
kmers from unitigs → MPHF → mphf.bin
→ evidence.bin : n × u32, each = (chunk_id: 25 bits | rank: 7 bits)
→ payload : counts/ (mode 2) or presence/ (mode 3)
2026-04-16 22:38:20 +02:00
```
The MPHF is built in two passes over `unitigs.bin`: parallel pass for `mphf.bin`, sequential pass for `evidence.bin` and payload. The exact kmer count is available from the unitig index (`unitigs.bin.idx`) before the passes begin.
2026-04-16 22:38:20 +02:00
**Exact verification via unitig evidence:**
`unitigs.bin` serves as the evidence structure. The MPHF maps every input to `[0, N)` including absent kmers — the unitig read-back (via `evidence.bin`) is the only correct membership test.
2026-04-16 22:38:20 +02:00
```
query kmer q
→ canonical_minimizer(q) → hash → PART → part_XXXXX/
→ MPHF(q) → slot s
→ evidence[s] = (chunk_id, rank)
→ read k nucleotides at rank in unitigs[chunk_id] → compare with q
→ match : return payload[s] ← exact hit
→ no match: kmer absent ← MPHF collision on absent kmer
2026-04-16 22:38:20 +02:00
```
`superkmers.bin.gz` is no longer needed at this point and can be deleted.