Files
obikmer/scripts/compare_kmers.py
T
Eric Coissac 5169f65dc9 feat: implement persistent layered index and chunked binary format
Introduce the `obilayeredmap` specification and persistent MPHF-based index architecture for incremental multi-dataset indexing. Implement chunked binary serialization with a fixed `u8` k-mer count limit (256) and overlapping super-kmer segments. Add memory-mapped I/O and a companion `.idx` index file for allocation-free, O(1) unitig access. Update MkDocs navigation, enhance the k-mer comparison script, and add comprehensive tests for serialization, partitioning, and file I/O pipelines.
2026-05-09 17:38:29 +08:00

123 lines
3.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""Compare the canonical k-mer sets of two FASTA files.
Reports how many k-mers are shared, exclusive to each file, or missing.
Handles plain and gzip-compressed FASTA (.gz).
Usage
-----
compare_kmers.py -k 31 file_a.fasta.gz file_b.fasta.gz
"""
import argparse
import gzip
import sys
from pathlib import Path
COMP = str.maketrans("ACGTacgt", "TGCAtgca")
def revcomp(seq: str) -> str:
return seq.translate(COMP)[::-1]
def canonical(seq: str) -> str:
rc = revcomp(seq)
return seq if seq <= rc else rc
def open_fasta(path: str):
p = Path(path)
if p.suffix == ".gz":
return gzip.open(path, "rt")
return open(path, "r")
def iter_sequences(path: str):
"""Yield (header, sequence) pairs from a FASTA file."""
header = None
parts = []
with open_fasta(path) as fh:
for line in fh:
line = line.rstrip()
if line.startswith(">"):
if header is not None:
yield header, "".join(parts)
header = line[1:]
parts = []
else:
parts.append(line.upper())
if header is not None:
yield header, "".join(parts)
def extract_kmers(path: str, k: int) -> tuple[set[str], int]:
"""Return (set of canonical k-mers, count of duplicated k-mers) from *path*.
A k-mer is duplicated if its canonical form appears more than once across
all sequences in the file.
"""
counts: dict[str, int] = {}
for _, seq in iter_sequences(path):
for i in range(len(seq) - k + 1):
kmer = seq[i : i + k]
if all(c in "ACGT" for c in kmer):
ck = canonical(kmer)
counts[ck] = counts.get(ck, 0) + 1
duplicated = sum(1 for v in counts.values() if v > 1)
return set(counts), duplicated
def main():
parser = argparse.ArgumentParser(
description="Compare canonical k-mer sets between two FASTA files."
)
parser.add_argument("file_a", help="First FASTA file (reference)")
parser.add_argument("file_b", help="Second FASTA file (to compare)")
parser.add_argument(
"-k",
"--kmer-size",
type=int,
default=31,
metavar="K",
help="k-mer size (default: 31)",
)
args = parser.parse_args()
k = args.kmer_size
print(f"k = {k}")
print(f"A = {args.file_a}")
print(f"B = {args.file_b}")
print()
print("reading A …", file=sys.stderr)
set_a, dup_a = extract_kmers(args.file_a, k)
print("reading B …", file=sys.stderr)
set_b, dup_b = extract_kmers(args.file_b, k)
only_a = set_a - set_b
only_b = set_b - set_a
common = set_a & set_b
print(f"{'kmers in A':<25} {len(set_a):>12,}")
print(f"{'duplicated in A':<25} {dup_a:>12,}")
print(f"{'kmers in B':<25} {len(set_b):>12,}")
print(f"{'duplicated in B':<25} {dup_b:>12,}")
print(f"{'common':<25} {len(common):>12,}")
print(f"{'only in A (lost)':<25} {len(only_a):>12,}")
print(f"{'only in B (gained)':<25} {len(only_b):>12,}")
if only_a or only_b:
print("\nSets differ.", file=sys.stderr)
if len(only_a) > 0 and len(only_b) <= 10:
print(f"\nOnly in A: {only_a}")
if len(only_b) > 0 and len(only_b) <= 10:
print(f"\nOnly in B: {only_b}")
sys.exit(1)
else:
print("\nSets are identical.")
if __name__ == "__main__":
main()