Files
ecoprimers/tools/ecoPCRFormat.py

696 lines
20 KiB
Python
Raw Normal View History

2025-06-12 13:29:46 +00:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
2025-06-12 13:29:46 +00:00
from __future__ import print_function
import re
import gzip
import struct
import sys
import time
import getopt
2025-06-12 13:29:46 +00:00
import zlib
2025-06-12 13:29:46 +00:00
_dbenable = False
#####
#
#
# Generic file function
#
#
#####
def universalOpen(file):
2025-06-12 13:29:46 +00:00
if isinstance(file, str):
if file.endswith('.gz'):
if sys.version_info[0] < 3:
rep = gzip.open(file)
else:
rep = gzip.open(file, 'rt', encoding='latin-1')
else:
2025-06-12 13:29:46 +00:00
rep = open(file, 'r')
else:
rep = file
return rep
def universalTell(file):
2025-06-12 13:29:46 +00:00
if hasattr(file, 'myfileobj'): # Python 3 gzip
return file.myfileobj.tell()
elif hasattr(file, 'fileobj'): # Python 2 gzip
return file.fileobj.tell()
return file.tell()
def fileSize(file):
2025-06-12 13:29:46 +00:00
if hasattr(file, 'myfileobj'): # Python 3 gzip
f = file.myfileobj
elif hasattr(file, 'fileobj'): # Python 2 gzip
f = file.fileobj
else:
f = file
if not hasattr(f, 'fileno'):
return None
pos = f.tell()
f.seek(0, 2)
length = f.tell()
f.seek(pos, 0)
return length
2025-06-12 13:29:46 +00:00
def progressBar(pos, max_size, reset=False, delta=[]):
if reset:
del delta[:]
if not delta:
delta.append(time.time())
delta.append(time.time())
2025-06-12 13:29:46 +00:00
delta[1] = time.time()
elapsed = delta[1] - delta[0]
if max_size is None:
sys.stderr.write('\rReaded sequences : %d ' % pos)
sys.stderr.flush()
return
percent = float(pos) / max_size * 100 if max_size > 0 else 0
if percent > 0:
remain = elapsed / percent * (100 - percent)
remain_str = time.strftime('%H:%M:%S', time.gmtime(int(remain)))
else:
remain_str = '?'
bar_length = 50
filled = int(percent / 2)
bar = '#' * filled
bar += '|/-\\'[pos % 4]
bar += ' ' * (bar_length - filled - 1)
sys.stderr.write('\r%5.1f %% |%s] remain : %s' % (percent, bar, remain_str))
sys.stderr.flush()
#####
#
#
# NCBI Dump Taxonomy reader
#
#
#####
2025-06-12 13:29:46 +00:00
class endLessIterator(object):
def __init__(self, endedlist):
self.endedlist = endedlist
self.index = 0
def __iter__(self):
return self
def next(self): # Python 2
if self.index < len(self.endedlist):
item = self.endedlist[self.index]
self.index += 1
return item
else:
return self.endedlist[-1]
__next__ = next # Python 3 compatibility
class ColumnFile(object):
2025-06-12 13:29:46 +00:00
def __init__(self, stream, sep=None, strip=True, types=None):
if isinstance(stream, str):
self._stream = open(stream)
else:
2025-06-12 13:29:46 +00:00
try:
iter(stream)
self._stream = stream
2025-06-12 13:29:46 +00:00
except TypeError:
raise ValueError('stream must be string or an iterator')
2025-06-12 13:29:46 +00:00
self._delimiter = sep
self._strip = strip
self._types = types
@staticmethod
def str2bool(x):
2025-06-12 13:29:46 +00:00
val = x.strip()[0].upper()
if val in 'T1Y':
return True
elif val in 'F0N':
return False
raise ValueError("Invalid boolean value: %s" % x)
def __iter__(self):
return self
2025-06-12 13:29:46 +00:00
def next(self): # Python 2
ligne = next(self._stream)
data = ligne.split(self._delimiter)
2025-06-12 13:29:46 +00:00
if self._strip:
data = [x.strip() for x in data]
2025-06-12 13:29:46 +00:00
if self._types:
2025-06-12 13:29:46 +00:00
typed_data = []
for i, val in enumerate(data):
if i < len(self._types):
t = self._types[i]
if t is bool:
typed_data.append(self.str2bool(val))
else:
typed_data.append(t(val))
else:
typed_data.append(val)
return typed_data
return data
2025-06-12 13:29:46 +00:00
__next__ = next # Python 3 compatibility
def bsearchTaxon(taxonomy, taxid):
taxCount = len(taxonomy)
begin = 0
2025-06-12 13:29:46 +00:00
end = taxCount
oldcheck = -1
check = int(begin + (end - begin) / 2)
while check != oldcheck and check < taxCount and taxonomy[check][0] != taxid:
if taxonomy[check][0] < taxid:
2025-06-12 13:29:46 +00:00
begin = check
else:
2025-06-12 13:29:46 +00:00
end = check
oldcheck = check
check = int(begin + (end - begin) / 2)
if check < taxCount and taxonomy[check][0] == taxid:
return check
2025-06-12 13:29:46 +00:00
return None
2025-06-12 13:29:46 +00:00
def readNodeTable(file):
file = universalOpen(file)
2025-06-12 13:29:46 +00:00
nodes = ColumnFile(file,
sep='|',
types=(int, int, str,
str, str, bool,
int, bool, int,
bool, bool, bool, str))
print("Reading taxonomy dump file...", file=sys.stderr)
2025-06-12 13:29:46 +00:00
sys.stderr.flush()
taxonomy = []
for n in nodes:
taxonomy.append([n[0], n[2], n[1]])
print("List all taxonomy rank...", file=sys.stderr)
sys.stderr.flush()
ranks = list(set(x[1] for x in taxonomy))
ranks.sort()
2025-06-12 13:29:46 +00:00
rank_dict = {rank: idx for idx, rank in enumerate(ranks)}
print("Sorting taxons...", file=sys.stderr)
2025-06-12 13:29:46 +00:00
sys.stderr.flush()
taxonomy.sort(key=lambda x: x[0])
print("Indexing taxonomy...", file=sys.stderr)
2025-06-12 13:29:46 +00:00
sys.stderr.flush()
index = {}
2025-06-12 13:29:46 +00:00
for i, t in enumerate(taxonomy):
index[t[0]] = i
print("Indexing parent and rank...", file=sys.stderr)
2025-06-12 13:29:46 +00:00
sys.stderr.flush()
for t in taxonomy:
2025-06-12 13:29:46 +00:00
t[1] = rank_dict[t[1]]
t[2] = index.get(t[2], None)
return taxonomy, rank_dict, index
def nameIterator(file):
file = universalOpen(file)
2025-06-12 13:29:46 +00:00
names = ColumnFile(file,
sep='|',
types=(int, str, str, str))
for row in names:
if len(row) >= 4:
taxid, name, unique, classname = row[:4]
yield taxid, name, classname
def mergedNodeIterator(file):
file = universalOpen(file)
2025-06-12 13:29:46 +00:00
merged = ColumnFile(file,
sep='|',
types=(int, int, str))
for row in merged:
if len(row) >= 2:
taxid, current = row[:2]
yield taxid, current
def deletedNodeIterator(file):
file = universalOpen(file)
2025-06-12 13:29:46 +00:00
deleted = ColumnFile(file,
sep='|',
types=(int, str))
for row in deleted:
if row:
taxid = row[0]
yield taxid
2025-06-12 13:29:46 +00:00
def readTaxonomyDump(taxdir):
2025-06-12 13:29:46 +00:00
taxonomy, ranks, index = readNodeTable(taxdir + '/nodes.dmp')
# Initialize scientific name as empty string for each taxon
for tx in taxonomy:
tx.append("") # Add fourth element: scientific name (empty at start)
print("Adding scientific name...", file=sys.stderr)
2025-06-12 13:29:46 +00:00
sys.stderr.flush()
alternativeName = []
for taxid, name, classname in nameIterator(taxdir + '/names.dmp'):
idx = index.get(taxid, None)
if idx is not None:
# Clean and ensure ASCII only
name_clean = ''.join(c for c in name if ord(c) < 128)
alternativeName.append((name_clean, classname, idx))
if classname == 'scientific name':
taxonomy[idx][3] = name_clean
print("Adding taxid alias...", file=sys.stderr)
2025-06-12 13:29:46 +00:00
sys.stderr.flush()
for taxid, current in mergedNodeIterator(taxdir + '/merged.dmp'):
if current in index:
index[taxid] = index[current]
print("Adding deleted taxid...", file=sys.stderr)
2025-06-12 13:29:46 +00:00
sys.stderr.flush()
for taxid in deletedNodeIterator(taxdir + '/delnodes.dmp'):
index[taxid] = None
return taxonomy, ranks, alternativeName, index
#####
#
#
# Genbank/EMBL sequence reader
#
#
#####
def entryIterator(file):
file = universalOpen(file)
2025-06-12 13:29:46 +00:00
rep = []
ligne = file.readline()
while ligne:
rep.append(ligne)
if ligne == '//\n':
2025-06-12 13:29:46 +00:00
yield ''.join(rep)
rep = []
ligne = file.readline()
2025-06-12 13:29:46 +00:00
if rep:
yield ''.join(rep)
def fastaEntryIterator(file):
file = universalOpen(file)
2025-06-12 13:29:46 +00:00
rep = []
ligne = file.readline()
while ligne:
2025-06-12 13:29:46 +00:00
if ligne.startswith('>') and rep:
yield ''.join(rep)
rep = []
rep.append(ligne)
ligne = file.readline()
if rep:
2025-06-12 13:29:46 +00:00
yield ''.join(rep)
_cleanSeq = re.compile(r'[\s\d]') # More efficient
def cleanSeq(seq):
2025-06-12 13:29:46 +00:00
return _cleanSeq.sub('', seq)
# GenBank patterns
_gbParseID = re.compile(r'(?<=^LOCUS {7})[^ ]+(?= )', re.MULTILINE)
_gbParseDE = re.compile(r'(?<=^DEFINITION {2}).+?\. *$', re.MULTILINE | re.DOTALL)
_gbParseSQ = re.compile(r'(?<=^ORIGIN).+?(?=^//$)', re.MULTILINE | re.DOTALL)
_gbParseTX = re.compile(r'(?<= /db_xref="taxon:)[0-9]+(?=")')
def genbankEntryParser(entry):
2025-06-12 13:29:46 +00:00
Id_match = _gbParseID.findall(entry)
Id = Id_match[0] if Id_match else "UNKNOWN_ID"
De_match = _gbParseDE.findall(entry)
De = ' '.join(De_match[0].split()) if De_match else ""
2025-06-12 13:29:46 +00:00
Sq_match = _gbParseSQ.findall(entry)
Sq = cleanSeq(Sq_match[0].upper()) if Sq_match else ""
2025-06-12 13:29:46 +00:00
Tx_match = _gbParseTX.findall(entry)
Tx = int(Tx_match[0]) if Tx_match else None
2025-06-12 13:29:46 +00:00
# Clean and ensure ASCII only
Id = ''.join(c for c in Id if ord(c) < 128)
De = ''.join(c for c in De if ord(c) < 128)
2025-06-12 13:29:46 +00:00
return {'id': Id, 'taxid': Tx, 'definition': De, 'sequence': Sq}
# EMBL patterns
_emblParseID = re.compile(r'(?<=^ID {3})[^ ]+(?=;)', re.MULTILINE)
_emblParseDE = re.compile(r'(?<=^DE {3}).+?\. *$', re.MULTILINE | re.DOTALL)
_emblParseSQ = re.compile(r'(?<=^ ).+?(?=^//$)', re.MULTILINE | re.DOTALL)
_emblParseTX = re.compile(r'(?<= /db_xref="taxon:)[0-9]+(?=")')
def emblEntryParser(entry):
2025-06-12 13:29:46 +00:00
Id_match = _emblParseID.findall(entry)
Id = Id_match[0] if Id_match else "UNKNOWN_ID"
De_match = _emblParseDE.findall(entry)
De = ' '.join(De_match[0].replace('\nDE ', ' ').split()) if De_match else ""
Sq_match = _emblParseSQ.findall(entry)
Sq = cleanSeq(Sq_match[0].upper()) if Sq_match else ""
2025-06-12 13:29:46 +00:00
Tx_match = _emblParseTX.findall(entry)
Tx = int(Tx_match[0]) if Tx_match else None
2025-06-12 13:29:46 +00:00
# Clean and ensure ASCII only
Id = ''.join(c for c in Id if ord(c) < 128)
De = ''.join(c for c in De if ord(c) < 128)
2025-06-12 13:29:46 +00:00
return {'id': Id, 'taxid': Tx, 'definition': De, 'sequence': Sq}
# FASTA processing
_fastaSplit = re.compile(r';\W*')
def parseFasta(seq):
2025-06-12 13:29:46 +00:00
lines = seq.split('\n')
if not lines:
return "", "", "", {}
header = lines[0].strip()
if header.startswith('>'):
header = header[1:]
parts = header.split(None, 1)
seq_id = parts[0] if parts else ""
description = parts[1] if len(parts) > 1 else ""
fields = _fastaSplit.split(description) if description else []
info = {}
other_desc = []
for field in fields:
if '=' in field:
key, val = field.split('=', 1)
info[key.strip()] = val.strip()
else:
other_desc.append(field)
definition = ' '.join(other_desc)
sequence = ''.join(x.strip() for x in lines[1:]).upper()
# Clean and ensure ASCII only
seq_id = ''.join(c for c in seq_id if ord(c) < 128)
definition = ''.join(c for c in definition if ord(c) < 128)
return seq_id, sequence, definition, info
def fastaEntryParser(entry):
2025-06-12 13:29:46 +00:00
seq_id, sequence, definition, info = parseFasta(entry)
Tx = info.get('taxid', None)
if Tx is not None:
2025-06-12 13:29:46 +00:00
try:
Tx = int(Tx)
except ValueError:
Tx = None
return {'id': seq_id, 'taxid': Tx, 'definition': definition, 'sequence': sequence}
2025-06-12 13:29:46 +00:00
def sequenceIteratorFactory(entryParser, entryIterator):
def sequenceIterator(file):
for entry in entryIterator(file):
yield entryParser(entry)
return sequenceIterator
#####
#
#
2025-06-12 13:29:46 +00:00
# Binary writer (CORRECTED VERSION)
#
#
#####
2025-06-12 13:29:46 +00:00
def ecoSeqPacker(sq):
2025-06-12 13:29:46 +00:00
# Ensure sequence is ASCII uppercase
sequence = ''.join(c for c in sq['sequence'] if ord(c) < 128).upper()
seq_bytes = sequence.encode('ascii')
compactseq = zlib.compress(seq_bytes, 9)
cptseqlength = len(compactseq)
# Clean definition to pure ASCII
definition = ''.join(c for c in sq['definition'] if ord(c) < 128)
de_bytes = definition.encode('ascii')
delength = len(de_bytes)
# Clean ID to pure ASCII and pad to 20 bytes
seq_id = ''.join(c for c in sq['id'] if ord(c) < 128)
seq_id_bytes = seq_id.encode('ascii')
if len(seq_id_bytes) > 20:
seq_id_bytes = seq_id_bytes[:20]
else:
seq_id_bytes = seq_id_bytes.ljust(20, b'\0')
# Calculate total size (4 bytes for each integer + data lengths)
totalSize = 4 + 20 + 4 + 4 + 4 + delength + cptseqlength
# Pack with correct byte order and field order
packed = struct.pack('>I', totalSize)
packed += struct.pack('>I', sq['taxid'])
packed += seq_id_bytes
packed += struct.pack('>I', delength)
packed += struct.pack('>I', len(sequence))
packed += struct.pack('>I', cptseqlength)
packed += de_bytes
packed += compactseq
return packed
def ecoTaxPacker(tx):
2025-06-12 13:29:46 +00:00
# Clean name to pure ASCII
name_clean = ''.join(c for c in tx[3] if ord(c) < 128)
name_bytes = name_clean.encode('ascii')
namelength = len(name_bytes)
totalSize = 4 + 4 + 4 + 4 + namelength
2025-06-12 13:29:46 +00:00
packed = struct.pack('>I', totalSize)
packed += struct.pack('>I', tx[0]) # taxid
packed += struct.pack('>I', tx[1]) # rank index
packed += struct.pack('>I', tx[2]) # parent index
packed += struct.pack('>I', namelength)
packed += name_bytes
return packed
def ecoRankPacker(rank):
2025-06-12 13:29:46 +00:00
# Clean rank to pure ASCII
rank_clean = ''.join(c for c in rank if ord(c) < 128)
rank_bytes = rank_clean.encode('ascii')
namelength = len(rank_bytes)
packed = struct.pack('>I', namelength)
packed += rank_bytes
return packed
2025-06-12 13:29:46 +00:00
def ecoNamePacker(name):
2025-06-12 13:29:46 +00:00
# Clean name and class to pure ASCII
name_clean = ''.join(c for c in name[0] if ord(c) < 128)
class_clean = ''.join(c for c in name[1] if ord(c) < 128)
name_bytes = name_clean.encode('ascii')
class_bytes = class_clean.encode('ascii')
namelength = len(name_bytes)
classlength = len(class_bytes)
totalSize = 4 + 4 + 4 + 4 + namelength + classlength
packed = struct.pack('>I', totalSize)
packed += struct.pack('>I', 1 if name[1] == 'scientific name' else 0)
packed += struct.pack('>I', namelength)
packed += struct.pack('>I', classlength)
packed += struct.pack('>I', name[2]) # taxon index
packed += name_bytes
packed += class_bytes
return packed
2025-06-12 13:29:46 +00:00
def ecoSeqWriter(file, input_file, taxindex, parser):
output = open(file, 'wb')
input_stream = universalOpen(input_file)
inputsize = fileSize(input_stream)
entries = parser(input_stream)
seqcount = 0
skipped = []
2025-06-12 13:29:46 +00:00
# Write placeholder for sequence count
output.write(struct.pack('>I', 0))
progressBar(0, inputsize, reset=True)
for entry in entries:
if entry['taxid'] is not None:
try:
2025-06-12 13:29:46 +00:00
# Convert to integer taxid
taxid_int = int(entry['taxid'])
# Lookup in taxindex
entry['taxid'] = taxindex.get(taxid_int, None)
except (ValueError, TypeError):
entry['taxid'] = None
if entry['taxid'] is not None:
2025-06-12 13:29:46 +00:00
seqcount += 1
packed = ecoSeqPacker(entry)
output.write(packed)
else:
skipped.append(entry['id'])
2025-06-12 13:29:46 +00:00
# Update progress
if inputsize is not None:
where = universalTell(input_stream)
progressBar(where, inputsize)
else:
2025-06-12 13:29:46 +00:00
progressBar(seqcount, seqcount + 1) # Fake progress
sys.stderr.write(" Readed sequences : %d \r" % seqcount)
sys.stderr.flush()
print("\n", file=sys.stderr)
# Update sequence count at beginning of file
output.seek(0)
output.write(struct.pack('>I', seqcount))
output.close()
2025-06-12 13:29:46 +00:00
return skipped
2025-06-12 13:29:46 +00:00
def ecoTaxWriter(file, taxonomy):
output = open(file, 'wb')
output.write(struct.pack('>I', len(taxonomy)))
for tx in taxonomy:
output.write(ecoTaxPacker(tx))
output.close()
2025-06-12 13:29:46 +00:00
def ecoRankWriter(file, ranks):
output = open(file, 'wb')
rank_list = sorted(ranks.keys())
output.write(struct.pack('>I', len(rank_list)))
for rank in rank_list:
output.write(ecoRankPacker(rank))
output.close()
2025-06-12 13:29:46 +00:00
def ecoNameWriter(file, names):
output = open(file, 'wb')
output.write(struct.pack('>I', len(names)))
2025-06-12 13:29:46 +00:00
# Sort case-insensitive
names.sort(key=lambda x: x[0].lower())
for name in names:
output.write(ecoNamePacker(name))
output.close()
2025-06-12 13:29:46 +00:00
def ecoDBWriter(prefix, taxonomy, seqFileNames, parser):
ecoRankWriter(prefix + '.rdx', taxonomy[1])
ecoTaxWriter(prefix + '.tdx', taxonomy[0])
ecoNameWriter(prefix + '.ndx', taxonomy[2])
filecount = 0
for filename in seqFileNames:
2025-06-12 13:29:46 +00:00
filecount += 1
outfile = "%s_%03d.sdx" % (prefix, filecount)
print("\nProcessing:", filename, file=sys.stderr)
skipped = ecoSeqWriter(outfile, filename, taxonomy[3], parser)
if skipped:
print("\nSkipped entries without valid taxonomy:", file=sys.stderr)
for sid in skipped:
print(sid, file=sys.stderr)
else:
print("All entries processed successfully", file=sys.stderr)
def ecoParseOptions(arguments):
opt = {
2025-06-12 13:29:46 +00:00
'prefix': 'ecodb',
'taxdir': 'taxdump',
'parser': sequenceIteratorFactory(genbankEntryParser, entryIterator)
}
try:
o, filenames = getopt.getopt(arguments,
'ht:T:n:gfe',
['help',
'taxonomy=',
'taxonomy_db=',
'name=',
'genbank',
'fasta',
'embl'])
except getopt.GetoptError as err:
print(str(err), file=sys.stderr)
printHelp()
sys.exit(2)
for name, value in o:
if name in ('-h', '--help'):
printHelp()
2025-06-12 13:29:46 +00:00
sys.exit()
elif name in ('-t', '--taxonomy'):
opt['taxdir'] = value
elif name in ('-n', '--name'):
opt['prefix'] = value
elif name in ('-g', '--genbank'):
opt['parser'] = sequenceIteratorFactory(genbankEntryParser, entryIterator)
elif name in ('-f', '--fasta'):
opt['parser'] = sequenceIteratorFactory(fastaEntryParser, fastaEntryIterator)
elif name in ('-e', '--embl'):
opt['parser'] = sequenceIteratorFactory(emblEntryParser, entryIterator)
else:
2025-06-12 13:29:46 +00:00
raise ValueError('Unknown option: %s' % name)
2025-06-12 13:29:46 +00:00
return opt, filenames
def printHelp():
print("-----------------------------------")
print(" ecoPCRFormat.py")
print("-----------------------------------")
2025-06-12 13:29:46 +00:00
print("Converts sequence databases to ecoPCR format")
print("Usage: ecoPCRFormat.py [options] <files>")
print("Options:")
print(" -e, --embl Input in EMBL format")
print(" -f, --fasta Input in FASTA format")
print(" -g, --genbank Input in GenBank format (default)")
print(" -h, --help Show this help message")
print(" -n NAME, --name=NAME Database prefix (default: ecodb)")
print(" -t DIR, --taxonomy=DIR Taxonomy directory (default: taxdump)")
print("-----------------------------------")
if __name__ == '__main__':
2025-06-12 13:29:46 +00:00
try:
opt, filenames = ecoParseOptions(sys.argv[1:])
if not filenames:
print("Error: No input files specified", file=sys.stderr)
printHelp()
sys.exit(1)
print("Loading taxonomy...", file=sys.stderr)
taxonomy = readTaxonomyDump(opt['taxdir'])
print("Processing sequence files...", file=sys.stderr)
ecoDBWriter(opt['prefix'], taxonomy, filenames, opt['parser'])
print("Database creation complete", file=sys.stderr)
except Exception as e:
print("Error: " + str(e), file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)