#!/usr/bin/env python3 """Build a reference kmer index from paired-end FASTQ reads. Extracts canonical kmers — min(kmer, revcomp(kmer)) encoded as uint64 — counts their abundances, and saves a sorted numpy pair (kmers, counts). Output .npz arrays kmers : uint64, sorted ascending — canonical kmer integers counts : uint32, same order — raw read abundances """ import argparse import gzip import sys from collections import defaultdict import numpy as np # ── encoding ──────────────────────────────────────────────────────────────── _ENCODE = {'A': 0, 'C': 1, 'G': 2, 'T': 3, 'a': 0, 'c': 1, 'g': 2, 't': 3} # Lookup table: revcomp of one byte (4 bases, 8 bits). # Precomputed once at import time. _REVCOMP8 = [0] * 256 for _i in range(256): _rc, _x = 0, _i for _ in range(4): _rc = (_rc << 2) | (3 - (_x & 3)) _x >>= 2 _REVCOMP8[_i] = _rc del _i, _rc, _x def revcomp_int(kmer: int, k: int) -> int: """Reverse-complement of a kmer encoded as an integer (2 bits/base). Uses byte-level lookup (4 bases at a time) for speed. """ rc = 0 bits_left = 2 * k while bits_left > 0: chunk = min(8, bits_left) rc_byte = _REVCOMP8[kmer & 0xFF] >> (8 - chunk) rc = (rc << chunk) | rc_byte kmer >>= chunk bits_left -= chunk return rc # ── FASTQ parsing ──────────────────────────────────────────────────────────── def iter_sequences(path: str): """Yield raw sequences from a (gzipped) FASTQ file.""" opener = gzip.open if path.endswith('.gz') else open with opener(path, 'rt') as fh: while True: if not fh.readline(): # '@' header break seq = fh.readline().rstrip('\n') fh.readline() # '+' fh.readline() # quality yield seq # ── kmer counting ──────────────────────────────────────────────────────────── def count_kmers(paths: list[str], k: int) -> dict[int, int]: mask = (1 << (2 * k)) - 1 counts: dict[int, int] = defaultdict(int) n_reads = 0 for path in paths: for seq in iter_sequences(path): n_reads += 1 kmer = 0 run = 0 # consecutive valid bases for c in seq: b = _ENCODE.get(c) if b is None: # N or unexpected character → reset kmer = 0 run = 0 continue kmer = ((kmer << 2) | b) & mask run += 1 if run >= k: rc = revcomp_int(kmer, k) counts[kmer if kmer <= rc else rc] += 1 if n_reads % 100_000 == 0: print(f' {n_reads:,} reads processed, ' f'{len(counts):,} distinct kmers so far', file=sys.stderr) print(f' {n_reads:,} reads total, {len(counts):,} distinct kmers', file=sys.stderr) return counts # ── main ───────────────────────────────────────────────────────────────────── def main() -> None: ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument('reads', nargs='+', metavar='FASTQ', help='Input reads (FASTQ, gzip OK)') ap.add_argument('-k', '--kmer-size', type=int, default=31, metavar='K') ap.add_argument('--min-abundance', type=int, default=1, metavar='N', help='Drop kmers with count < N (default 1)') ap.add_argument('-o', '--output', required=True, metavar='FILE', help='Output .npz path') args = ap.parse_args() print(f'k={args.kmer_size} files={len(args.reads)}', file=sys.stderr) counts = count_kmers(args.reads, args.kmer_size) if args.min_abundance > 1: before = len(counts) counts = {k: v for k, v in counts.items() if v >= args.min_abundance} print(f' min-abundance={args.min_abundance}: ' f'{before - len(counts):,} kmers dropped, ' f'{len(counts):,} retained', file=sys.stderr) print(f'Sorting and saving → {args.output}', file=sys.stderr) kmers_arr = np.fromiter(sorted(counts), dtype=np.uint64, count=len(counts)) counts_arr = np.array([counts[int(k)] for k in kmers_arr], dtype=np.uint32) np.savez_compressed(args.output, kmers=kmers_arr, counts=counts_arr) print(f'Done {len(kmers_arr):,} kmers → {args.output}', file=sys.stderr) if __name__ == '__main__': main()