Files
obikmer/scripts/compare_kmers.py
T
Eric Coissac 27f5e88a7b refactor: implement RoutableSuperKmer and update k-mer indexing pipeline
Replace raw SuperkMer routing with a new RoutableSuperKimer type that embeds canonical sequences and precomputed minimizers, enabling direct partition routing via hash. Update the build pipeline to yield RoutableSuperKmers throughout (builder, scatterer), refactor FASTA/unitig export commands to use the new type and compressed outputs (.fasta.gz, .unitigs.fasta.zst), revise SuperKmer header to store n_kmers instead of seql (avoiding 256-byte wrap), and update documentation to reflect minimizer-based theory, two evidence-encoding strategies for unitig-MPHF indexing (global offset vs. ID+rank), and the new obipipeline library architecture with parallel workers, biased scheduling, and error handling.
2026-05-01 09:33:26 +02:00

107 lines
2.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""Compare the canonical k-mer sets of two FASTA files.
Reports how many k-mers are shared, exclusive to each file, or missing.
Handles plain and gzip-compressed FASTA (.gz).
Usage
-----
compare_kmers.py -k 31 file_a.fasta.gz file_b.fasta.gz
"""
import argparse
import gzip
import sys
from pathlib import Path
COMP = str.maketrans("ACGTacgt", "TGCAtgca")
def revcomp(seq: str) -> str:
return seq.translate(COMP)[::-1]
def canonical(seq: str) -> str:
rc = revcomp(seq)
return seq if seq <= rc else rc
def open_fasta(path: str):
p = Path(path)
if p.suffix == ".gz":
return gzip.open(path, "rt")
return open(path, "r")
def iter_sequences(path: str):
"""Yield (header, sequence) pairs from a FASTA file."""
header = None
parts = []
with open_fasta(path) as fh:
for line in fh:
line = line.rstrip()
if line.startswith(">"):
if header is not None:
yield header, "".join(parts)
header = line[1:]
parts = []
else:
parts.append(line.upper())
if header is not None:
yield header, "".join(parts)
def extract_kmers(path: str, k: int) -> set[str]:
"""Return the set of canonical k-mers from all sequences in *path*."""
kmers: set[str] = set()
for _, seq in iter_sequences(path):
# skip any character that is not ACGT
for i in range(len(seq) - k + 1):
kmer = seq[i : i + k]
if all(c in "ACGT" for c in kmer):
kmers.add(canonical(kmer))
return kmers
def main():
parser = argparse.ArgumentParser(
description="Compare canonical k-mer sets between two FASTA files."
)
parser.add_argument("file_a", help="First FASTA file (reference)")
parser.add_argument("file_b", help="Second FASTA file (to compare)")
parser.add_argument(
"-k", "--kmer-size", type=int, default=31, metavar="K", help="k-mer size (default: 31)"
)
args = parser.parse_args()
k = args.kmer_size
print(f"k = {k}")
print(f"A = {args.file_a}")
print(f"B = {args.file_b}")
print()
print("reading A …", file=sys.stderr)
set_a = extract_kmers(args.file_a, k)
print("reading B …", file=sys.stderr)
set_b = extract_kmers(args.file_b, k)
only_a = set_a - set_b
only_b = set_b - set_a
common = set_a & set_b
print(f"{'kmers in A':<25} {len(set_a):>12,}")
print(f"{'kmers in B':<25} {len(set_b):>12,}")
print(f"{'common':<25} {len(common):>12,}")
print(f"{'only in A (lost)':<25} {len(only_a):>12,}")
print(f"{'only in B (gained)':<25} {len(only_b):>12,}")
if only_a or only_b:
print("\nSets differ.", file=sys.stderr)
sys.exit(1)
else:
print("\nSets are identical.")
if __name__ == "__main__":
main()