feat: add benchmark pipeline, expose APIs, and enforce strict paths

Introduces a Make-based orchestration for simulating, indexing, merging, filtering, and verifying k-mer counts and presence. Exposes internal builder and iterator APIs publicly, enforces mandatory leading slashes for predicate patterns, registers the `obitaxonomy` crate, and updates tooling configurations alongside documentation.
This commit is contained in:
Eric Coissac
2026-06-19 09:55:41 +02:00
parent 280ca1f5a3
commit c694e1f2b0
42 changed files with 2585 additions and 84 deletions
+137
View File
@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""Build a reference kmer index from paired-end FASTQ reads.
Extracts canonical kmers — min(kmer, revcomp(kmer)) encoded as uint64 —
counts their abundances, and saves a sorted numpy pair (kmers, counts).
Output .npz arrays
kmers : uint64, sorted ascending — canonical kmer integers
counts : uint32, same order — raw read abundances
"""
import argparse
import gzip
import sys
from collections import defaultdict
import numpy as np
# ── encoding ────────────────────────────────────────────────────────────────
_ENCODE = {'A': 0, 'C': 1, 'G': 2, 'T': 3,
'a': 0, 'c': 1, 'g': 2, 't': 3}
# Lookup table: revcomp of one byte (4 bases, 8 bits).
# Precomputed once at import time.
_REVCOMP8 = [0] * 256
for _i in range(256):
_rc, _x = 0, _i
for _ in range(4):
_rc = (_rc << 2) | (3 - (_x & 3))
_x >>= 2
_REVCOMP8[_i] = _rc
del _i, _rc, _x
def revcomp_int(kmer: int, k: int) -> int:
"""Reverse-complement of a kmer encoded as an integer (2 bits/base).
Uses byte-level lookup (4 bases at a time) for speed.
"""
rc = 0
bits_left = 2 * k
while bits_left > 0:
chunk = min(8, bits_left)
rc_byte = _REVCOMP8[kmer & 0xFF] >> (8 - chunk)
rc = (rc << chunk) | rc_byte
kmer >>= chunk
bits_left -= chunk
return rc
# ── FASTQ parsing ────────────────────────────────────────────────────────────
def iter_sequences(path: str):
"""Yield raw sequences from a (gzipped) FASTQ file."""
opener = gzip.open if path.endswith('.gz') else open
with opener(path, 'rt') as fh:
while True:
if not fh.readline(): # '@' header
break
seq = fh.readline().rstrip('\n')
fh.readline() # '+'
fh.readline() # quality
yield seq
# ── kmer counting ────────────────────────────────────────────────────────────
def count_kmers(paths: list[str], k: int) -> dict[int, int]:
mask = (1 << (2 * k)) - 1
counts: dict[int, int] = defaultdict(int)
n_reads = 0
for path in paths:
for seq in iter_sequences(path):
n_reads += 1
kmer = 0
run = 0 # consecutive valid bases
for c in seq:
b = _ENCODE.get(c)
if b is None: # N or unexpected character → reset
kmer = 0
run = 0
continue
kmer = ((kmer << 2) | b) & mask
run += 1
if run >= k:
rc = revcomp_int(kmer, k)
counts[kmer if kmer <= rc else rc] += 1
if n_reads % 100_000 == 0:
print(f' {n_reads:,} reads processed, '
f'{len(counts):,} distinct kmers so far',
file=sys.stderr)
print(f' {n_reads:,} reads total, {len(counts):,} distinct kmers',
file=sys.stderr)
return counts
# ── main ─────────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument('reads', nargs='+', metavar='FASTQ',
help='Input reads (FASTQ, gzip OK)')
ap.add_argument('-k', '--kmer-size', type=int, default=31,
metavar='K')
ap.add_argument('--min-abundance', type=int, default=1,
metavar='N', help='Drop kmers with count < N (default 1)')
ap.add_argument('-o', '--output', required=True,
metavar='FILE', help='Output .npz path')
args = ap.parse_args()
print(f'k={args.kmer_size} files={len(args.reads)}', file=sys.stderr)
counts = count_kmers(args.reads, args.kmer_size)
if args.min_abundance > 1:
before = len(counts)
counts = {k: v for k, v in counts.items() if v >= args.min_abundance}
print(f' min-abundance={args.min_abundance}: '
f'{before - len(counts):,} kmers dropped, '
f'{len(counts):,} retained',
file=sys.stderr)
print(f'Sorting and saving → {args.output}', file=sys.stderr)
kmers_arr = np.fromiter(sorted(counts), dtype=np.uint64, count=len(counts))
counts_arr = np.array([counts[int(k)] for k in kmers_arr], dtype=np.uint32)
np.savez_compressed(args.output, kmers=kmers_arr, counts=counts_arr)
print(f'Done {len(kmers_arr):,} kmers → {args.output}', file=sys.stderr)
if __name__ == '__main__':
main()