#!/usr/bin/env python3 """Compare an obikmer index against a reference kmer set (presence/absence). Loads the reference .npz (sorted uint64 kmers built by build_reference.py), streams the output of `obikmer dump`, encodes each kmer string to uint64, then reports false negatives and false positives using numpy set operations. Output to stdout: one CSV row species, strain, ref_kmers, idx_kmers, false_neg, false_pos, fn_pct, fp_pct """ import argparse import subprocess import sys import numpy as np # ── encoding ────────────────────────────────────────────────────────────────── _ENCODE = {'A': 0, 'C': 1, 'G': 2, 'T': 3, 'a': 0, 'c': 1, 'g': 2, 't': 3} _DECODE = ['A', 'C', 'G', 'T'] def encode_kmer(s: str) -> int: kmer = 0 for c in s: kmer = (kmer << 2) | _ENCODE[c] return kmer def decode_kmer(val: int, k: int) -> str: bases = [] for _ in range(k): bases.append(_DECODE[val & 3]) val >>= 2 return ''.join(reversed(bases)) # ── dump parsing ────────────────────────────────────────────────────────────── def load_index_kmers(obikmer_bin: str, index_dir: str) -> np.ndarray: """Stream `obikmer dump` and return a sorted uint64 array of kmer integers.""" cmd = [obikmer_bin, 'dump', index_dir] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True) kmers = [] header = True for line in proc.stdout: if header: header = False continue kmer_str = line.split(',', 1)[0] kmers.append(encode_kmer(kmer_str)) proc.wait() if proc.returncode != 0: print(f'ERROR: obikmer dump exited {proc.returncode}', file=sys.stderr) sys.exit(1) arr = np.array(kmers, dtype=np.uint64) arr.sort() return arr # ── comparison ──────────────────────────────────────────────────────────────── def compare(ref: np.ndarray, idx: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """Return (false_negatives, false_positives) as uint64 arrays.""" false_neg = np.setdiff1d(ref, idx, assume_unique=True) false_pos = np.setdiff1d(idx, ref, assume_unique=True) return false_neg, false_pos # ── main ───────────────────────────────────────────────────────────────────── def main() -> None: ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument('reference', metavar='REF_NPZ', nargs='?', help='Reference .npz file') ap.add_argument('index', metavar='INDEX_DIR', nargs='?', help='obikmer index directory') ap.add_argument('--obikmer', default='obikmer', help='Path to obikmer binary') ap.add_argument('--species', default='', help='Species label for CSV row') ap.add_argument('--strain', default='', help='Strain label for CSV row') ap.add_argument('--header', action='store_true', help='Print CSV header and exit') ap.add_argument('--save-fp', metavar='FILE', help='Save false-positive kmer strings to FILE') ap.add_argument('--save-fn', metavar='FILE', help='Save false-negative kmer strings to FILE') args = ap.parse_args() if args.header: print('species,strain,ref_kmers,idx_kmers,' 'false_neg,false_pos,fn_pct,fp_pct') return # Detect k from the index (one cheap call before the full dump). cmd1 = [args.obikmer, 'dump', '--head', '1', args.index] out1 = subprocess.check_output(cmd1, stderr=subprocess.DEVNULL, text=True) k = len(out1.splitlines()[1].split(',')[0]) # Load reference print(f'Loading reference: {args.reference}', file=sys.stderr) npz = np.load(args.reference) ref_kmers = npz['kmers'] # already sorted uint64 # Load index print(f'Streaming dump (k={k}): {args.index}', file=sys.stderr) idx_kmers = load_index_kmers(args.obikmer, args.index) print(f'k={k} ref={len(ref_kmers):,} idx={len(idx_kmers):,}', file=sys.stderr) false_neg, false_pos = compare(ref_kmers, idx_kmers) fn_pct = 100.0 * len(false_neg) / len(ref_kmers) if len(ref_kmers) else 0.0 fp_pct = 100.0 * len(false_pos) / len(idx_kmers) if len(idx_kmers) else 0.0 print(f'false negatives: {len(false_neg):,} ({fn_pct:.4f}%)', file=sys.stderr) print(f'false positives: {len(false_pos):,} ({fp_pct:.4f}%)', file=sys.stderr) if args.save_fn and len(false_neg): with open(args.save_fn, 'w') as fh: for v in false_neg: fh.write(decode_kmer(int(v), k) + '\n') print(f'False negatives saved → {args.save_fn}', file=sys.stderr) if args.save_fp and len(false_pos): with open(args.save_fp, 'w') as fh: for v in false_pos: fh.write(decode_kmer(int(v), k) + '\n') print(f'False positives saved → {args.save_fp}', file=sys.stderr) print(f'{args.species},{args.strain},' f'{len(ref_kmers)},{len(idx_kmers)},' f'{len(false_neg)},{len(false_pos)},' f'{fn_pct:.4f},{fp_pct:.4f}') if __name__ == '__main__': main()