Supprimer tools/ecoPCRFormat23.py

This commit is contained in:
2025-06-12 13:29:57 +00:00
parent 680cc4d7a2
commit f1ef2caae6

View File

@ -1,408 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Python 2.7 / 3 compatible version:
Prise en charge des fichiers .gz, parsers GenBank/EMBL/FASTA, écriture binaire.
"""
from __future__ import print_function, division
import sys
import os
import gzip
import struct
import time
import re
import getopt
# Compatibilité Python 2/3
try:
basestring
except NameError:
basestring = str
try:
raw_input
except NameError:
raw_input = input
# ---------- Utils fichiers ----------
def universal_open(f):
if isinstance(f, basestring):
if f.endswith('.gz'):
return gzip.open(f, 'rb')
return open(f, 'r')
return f
def universal_tell(f):
try:
if isinstance(f, gzip.GzipFile):
f = f.fileobj
except AttributeError:
pass
return f.tell()
def file_size(f):
pos = universal_tell(f)
f.seek(0, os.SEEK_END)
length = universal_tell(f)
f.seek(pos, os.SEEK_SET)
return length
# ---------- Progress bar ----------
def progress_bar(pos, maxval, reset=False, delta=[None, None]):
if reset:
delta[0] = time.time()
delta[1] = delta[0]
delta[1] = time.time()
elapsed = delta[1] - delta[0]
percent = float(pos) / maxval * 100 if maxval else 0
remain = time.strftime('%H:%M:%S',
time.gmtime(elapsed / percent * (100 - percent))) if percent else '??:??:??'
bar = '#' * int(percent/2)
bar += '|/-\\-'[int(pos) % 5]
bar += ' ' * (50 - int(percent/2))
sys.stderr.write('\r%5.1f %% |%s] remain : %s' % (percent, bar, remain))
sys.stderr.flush()
# ---------- Parsing séquences ----------
def entry_iterator(fh):
fh = universal_open(fh)
buf = []
for line in fh:
if isinstance(line, bytes):
line = line.decode('utf-8')
buf.append(line)
if line.strip() == '//':
yield ''.join(buf)
buf = []
if buf:
yield ''.join(buf)
def fasta_iterator(fh):
fh = universal_open(fh)
buf = []
for line in fh:
if isinstance(line, bytes):
line = line.decode('utf-8')
if line.startswith('>') and buf:
yield ''.join(buf)
buf = []
buf.append(line)
if buf:
yield ''.join(buf)
_fasta_split = re.compile(r';\W*')
_clean_seq = re.compile(r'[ \n0-9]+')
_clean_def = re.compile(r'[\nDE]')
def clean_seq(s):
return _clean_seq.sub('', s)
def clean_def(d):
return _clean_def.sub('', d)
def parse_fasta(entry):
lines = entry.strip().split('\n')
header = lines[0][1:].strip().split(None, 1)
seqid = header[0]
rest = header[1] if len(header) > 1 else ''
parts = _fasta_split.split(rest)
info = dict(x.split('=',1) for x in parts if '=' in x)
definition = ' '.join(p for p in parts if '=' not in p)
seq = ''.join(l.strip() for l in lines[1:]).upper()
taxid = int(info['taxid']) if 'taxid' in info else None
return {'id': seqid, 'taxid': taxid, 'definition': definition, 'sequence': seq}
# Regexp pour GenBank/EMBL
_gb_ID = re.compile(r'(?m)^LOCUS +([^ ]+)')
_gb_DE = re.compile(r'(?ms)^DEFINITION (.+?)\.\s*(?=[A-Z])')
_gb_ORI = re.compile(r'(?ms)^ORIGIN(.+?)//')
_gb_TX = re.compile(r'taxon:([0-9]+)')
_embl_ID = re.compile(r'(?m)^ID +([^ ;]+)')
_embl_DE = re.compile(r'(?ms)^DE (.+?)\.\s*(?=[A-Z])')
_embl_ORI = re.compile(r'(?ms)^\s{2}(.+?)//')
_embl_TX = re.compile(r'taxon:([0-9]+)')
def genbank_parser(entry):
id_ = _gb_ID.search(entry).group(1)
definition = ' '.join(_gb_DE.search(entry).group(1).split())
seq = clean_seq(_gb_ORI.search(entry).group(1)).upper()
tx_match = _gb_TX.search(entry)
taxid = int(tx_match.group(1)) if tx_match else None
return {'id': id_, 'taxid': taxid, 'definition': definition, 'sequence': seq}
def embl_parser(entry):
id_ = _embl_ID.search(entry).group(1)
definition = ' '.join(clean_def(_embl_DE.search(entry).group(1)).split())
seq = clean_seq(_embl_ORI.search(entry).group(1)).upper()
tx_match = _embl_TX.search(entry)
taxid = int(tx_match.group(1)) if tx_match else None
return {'id': id_, 'taxid': taxid, 'definition': definition, 'sequence': seq}
def sequence_iterator(parser, iterator):
def gen(fh):
for entry in iterator(fh):
yield parser(entry)
return gen
# ---------- Binaire (packer) ----------
def eco_seq_packer(sq):
seq_bytes = sq['sequence'].encode('ascii')
# gz compress
try:
comp = gzip.compress(seq_bytes, 9)
except AttributeError:
import zlib
comp = zlib.compress(seq_bytes, 9)
idb = sq['id'].encode('ascii')[:20]
idb += b' ' * (20 - len(idb))
defb = sq['definition'].encode('utf-8')
fields = (
4 + 4 + 20 + 4 + 4 + 4 + len(defb) + len(comp),
sq['taxid'] or 0,
idb,
len(defb),
len(seq_bytes),
len(comp),
defb,
comp
)
fmt = '>I I 20s I I I %ds %ds' % (len(defb), len(comp))
return struct.pack(fmt, *fields)
def eco_tax_packer(tx):
name = tx[3].encode('utf-8')
return struct.pack('>I I I I I %ds' %
len(name),
4 + 4 + 4 + 4 + 4 + len(name),
tx[0], tx[1], tx[2],
len(name), name)
def eco_rank_packer(rank):
b = rank.encode('utf-8')
return struct.pack('>I %ds' % len(b), len(b), b)
def eco_name_packer(name):
nm = name[0].encode('utf-8')
cl = name[1].encode('utf-8')
return struct.pack('>I I I I I %ds %ds' %
(len(nm), len(cl)),
4 + 4 + 4 + 4 + 4 + len(nm) + len(cl),
int(name[1] == 'scientific name'),
len(nm), len(cl),
name[2],
nm, cl)
# ---------- Writers ----------
def eco_seq_writer(outfile, infile, tax_index, parser):
out = open(outfile, 'wb')
inp = universal_open(infile)
size = file_size(inp)
entries = parser(inp)
count = 0
skipped = []
out.write(struct.pack('>I', 0))
progress_bar(0, size, reset=True)
for e in entries:
if e['taxid'] not in (None,):
ti = tax_index.get(e['taxid'])
if ti is not None:
out.write(eco_seq_packer(e))
count += 1
else:
skipped.append(e['id'])
else:
skipped.append(e['id'])
loc = universal_tell(inp)
progress_bar(loc, size)
sys.stderr.write('\n')
out.seek(0)
out.write(struct.pack('>I', count))
out.close()
return skipped
def eco_tax_writer(outfile, taxonomy):
out = open(outfile, 'wb')
out.write(struct.pack('>I', len(taxonomy)))
for tx in taxonomy:
out.write(eco_tax_packer(tx))
out.close()
def eco_rank_writer(outfile, ranks):
out = open(outfile, 'wb')
out.write(struct.pack('>I', len(ranks)))
for r in sorted(ranks.keys()):
out.write(eco_rank_packer(r))
out.close()
def eco_name_writer(outfile, names):
out = open(outfile, 'wb')
out.write(struct.pack('>I', len(names)))
for nm in sorted(names, key=lambda x: x[0].upper()):
out.write(eco_name_packer(nm))
out.close()
def eco_db_writer(prefix, taxonomy, seq_files, parser):
eco_rank_writer(prefix + '.rdx', taxonomy[1])
eco_tax_writer(prefix + '.tdx', taxonomy[0])
eco_name_writer(prefix + '.ndx', taxonomy[2])
for i, fn in enumerate(seq_files, 1):
sk = eco_seq_writer('%s_%03d.sdx' % (prefix, i), fn, taxonomy[3], parser)
if sk:
print("Skipped entries:", sk, file=sys.stderr)
# ---------- Taxonomy dump ----------
def end_less(it):
lst = list(it)
for x in lst:
yield x
while True:
yield lst[-1]
class ColumnFile(object):
def __init__(self, stream, sep=None, strip=True, types=None):
if isinstance(stream, basestring):
self.stream = open(stream)
else:
try:
iter(stream)
self.stream = stream
except:
raise ValueError("stream doit être string ou itérable")
self.sep = sep
self.strip = strip
self.types = types
if types:
self.types = list(types)
for i, t in enumerate(self.types):
if t is bool:
self.types[i] = ColumnFile.str2bool
@staticmethod
def str2bool(x):
return x.strip()[0].upper() in ('T','V')
def __iter__(self):
return self
def next(self):
return self.__next__()
def __next__(self):
line = next(self.stream)
parts = line.split(self.sep)
if self.strip or self.types:
parts = [p.strip() for p in parts]
if self.types:
out = []
types_it = end_less(self.types)
for part in parts:
typ = next(types_it)
out.append(typ(part))
return out
return parts
def taxon_cmp(a, b):
return (a[0] > b[0]) - (a[0] < b[0])
def bsearch_taxon(arr, tid):
lo, hi = 0, len(arr)
while lo < hi:
mid = (lo + hi) // 2
if arr[mid][0] < tid:
lo = mid + 1
else:
hi = mid
if lo < len(arr) and arr[lo][0] == tid:
return lo
return None
def read_node_table(fn):
fh = universal_open(fn)
cf = ColumnFile(fh, sep='|', strip=True,
types=(int, int, str, str, str, bool, int, bool, int, bool, bool, bool, str))
sys.stderr.write("Read taxonomy nodes...\n")
tax = [[n[0], n[2], n[1]] for n in cf]
ranks = sorted(set(x[1] for x in tax))
rankmap = {r: i for i, r in enumerate(ranks)}
tax.sort(key=lambda x: x[0])
idx = {t[0]: bsearch_taxon(tax, t[0]) for t in tax}
for t in tax:
t[1] = rankmap[t[1]]
t[2] = idx.get(t[2])
return tax, rankmap, idx
def name_iter(fn):
fh = universal_open(fn)
cf = ColumnFile(fh, sep='|', strip=True, types=(int, str, str, str))
for tid, name, uniq, cname in cf:
yield tid, name, cname
def merged_iter(fn):
fh = universal_open(fn)
cf = ColumnFile(fh, sep='|', strip=True, types=(int, int, str))
for tid, cur, _ in cf:
yield tid, cur
def deleted_iter(fn):
fh = universal_open(fn)
cf = ColumnFile(fh, sep='|', strip=True, types=(int, str))
for tid, _ in cf:
yield tid
def read_taxonomy_dump(taxdir):
tree, rankmap, idx = read_node_table(os.path.join(taxdir, 'nodes.dmp'))
sys.stderr.write("Reading names...\n")
alt = []
for tid, name, cname in name_iter(os.path.join(taxdir, 'names.dmp')):
alt.append((name, cname, idx.get(tid)))
if cname == 'scientific name':
tree[idx[tid]].append(name)
sys.stderr.write("Merged...\n")
for tid, cur in merged_iter(os.path.join(taxdir, 'merged.dmp')):
idx[tid] = idx.get(cur)
sys.stderr.write("Deleted...\n")
for tid in deleted_iter(os.path.join(taxdir, 'delnodes.dmp')):
idx[tid] = None
return tree, rankmap, alt, idx
# ---------- Options ----------
def parse_opts(args):
opts = {
'prefix': 'ecodb',
'taxdir': 'taxdump',
'parser': sequence_iterator(genbank_parser, entry_iterator)
}
flags, strs = getopt.getopt(args, 'ht:T:n:gfe', ['help', 'taxonomy=', 'taxonomy_db=', 'name=', 'genbank', 'fasta', 'embl'])
for o, v in flags:
if o in ('-h', '--help'):
print_help(); sys.exit()
elif o in ('-t', '--taxonomy'):
opts['taxdir'] = v
elif o in ('-n', '--name'):
opts['prefix'] = v
elif o in ('-g', '--genbank'):
opts['parser'] = sequence_iterator(genbank_parser, entry_iterator)
elif o in ('-f', '--fasta'):
opts['parser'] = sequence_iterator(parse_fasta, fasta_iterator)
elif o in ('-e', '--embl'):
opts['parser'] = sequence_iterator(embl_parser, entry_iterator)
else:
raise ValueError("Unknown opt %s" % o)
return opts, strs
def print_help():
h = """ecoPCRFormat.py [-g|-f|-e] [-t taxdump_dir] [-n prefix] <seqfiles>...
-g / --genbank : parser GenBank
-f / --fasta : parser FASTA
-e / --embl : parser EMBL
-t / --taxonomy : dir taxdump (nodes.dmp etc.)
-n / --name : prefix de sortie"""
print(h)
if __name__ == '__main__':
opts, seqs = parse_opts(sys.argv[1:])
tax = read_taxonomy_dump(opts['taxdir'])
eco_db_writer(opts['prefix'], tax, seqs, opts['parser'])