#!/usr/bin/env python # -*- coding: utf-8 -*- """ Python 2.7 / 3 compatible version: Prise en charge des fichiers .gz, parsers GenBank/EMBL/FASTA, écriture binaire. """ from __future__ import print_function, division import sys import os import gzip import struct import time import re import getopt # Compatibilité Python 2/3 try: basestring except NameError: basestring = str try: raw_input except NameError: raw_input = input # ---------- Utils fichiers ---------- def universal_open(f): if isinstance(f, basestring): if f.endswith('.gz'): return gzip.open(f, 'rb') return open(f, 'r') return f def universal_tell(f): try: if isinstance(f, gzip.GzipFile): f = f.fileobj except AttributeError: pass return f.tell() def file_size(f): pos = universal_tell(f) f.seek(0, os.SEEK_END) length = universal_tell(f) f.seek(pos, os.SEEK_SET) return length # ---------- Progress bar ---------- def progress_bar(pos, maxval, reset=False, delta=[None, None]): if reset: delta[0] = time.time() delta[1] = delta[0] delta[1] = time.time() elapsed = delta[1] - delta[0] percent = float(pos) / maxval * 100 if maxval else 0 remain = time.strftime('%H:%M:%S', time.gmtime(elapsed / percent * (100 - percent))) if percent else '??:??:??' bar = '#' * int(percent/2) bar += '|/-\\-'[int(pos) % 5] bar += ' ' * (50 - int(percent/2)) sys.stderr.write('\r%5.1f %% |%s] remain : %s' % (percent, bar, remain)) sys.stderr.flush() # ---------- Parsing séquences ---------- def entry_iterator(fh): fh = universal_open(fh) buf = [] for line in fh: if isinstance(line, bytes): line = line.decode('utf-8') buf.append(line) if line.strip() == '//': yield ''.join(buf) buf = [] if buf: yield ''.join(buf) def fasta_iterator(fh): fh = universal_open(fh) buf = [] for line in fh: if isinstance(line, bytes): line = line.decode('utf-8') if line.startswith('>') and buf: yield ''.join(buf) buf = [] buf.append(line) if buf: yield ''.join(buf) _fasta_split = re.compile(r';\W*') _clean_seq = re.compile(r'[ \n0-9]+') _clean_def = re.compile(r'[\nDE]') def clean_seq(s): return _clean_seq.sub('', s) def clean_def(d): return _clean_def.sub('', d) def parse_fasta(entry): lines = entry.strip().split('\n') header = lines[0][1:].strip().split(None, 1) seqid = header[0] rest = header[1] if len(header) > 1 else '' parts = _fasta_split.split(rest) info = dict(x.split('=',1) for x in parts if '=' in x) definition = ' '.join(p for p in parts if '=' not in p) seq = ''.join(l.strip() for l in lines[1:]).upper() taxid = int(info['taxid']) if 'taxid' in info else None return {'id': seqid, 'taxid': taxid, 'definition': definition, 'sequence': seq} # Regexp pour GenBank/EMBL _gb_ID = re.compile(r'(?m)^LOCUS +([^ ]+)') _gb_DE = re.compile(r'(?ms)^DEFINITION (.+?)\.\s*(?=[A-Z])') _gb_ORI = re.compile(r'(?ms)^ORIGIN(.+?)//') _gb_TX = re.compile(r'taxon:([0-9]+)') _embl_ID = re.compile(r'(?m)^ID +([^ ;]+)') _embl_DE = re.compile(r'(?ms)^DE (.+?)\.\s*(?=[A-Z])') _embl_ORI = re.compile(r'(?ms)^\s{2}(.+?)//') _embl_TX = re.compile(r'taxon:([0-9]+)') def genbank_parser(entry): id_ = _gb_ID.search(entry).group(1) definition = ' '.join(_gb_DE.search(entry).group(1).split()) seq = clean_seq(_gb_ORI.search(entry).group(1)).upper() tx_match = _gb_TX.search(entry) taxid = int(tx_match.group(1)) if tx_match else None return {'id': id_, 'taxid': taxid, 'definition': definition, 'sequence': seq} def embl_parser(entry): id_ = _embl_ID.search(entry).group(1) definition = ' '.join(clean_def(_embl_DE.search(entry).group(1)).split()) seq = clean_seq(_embl_ORI.search(entry).group(1)).upper() tx_match = _embl_TX.search(entry) taxid = int(tx_match.group(1)) if tx_match else None return {'id': id_, 'taxid': taxid, 'definition': definition, 'sequence': seq} def sequence_iterator(parser, iterator): def gen(fh): for entry in iterator(fh): yield parser(entry) return gen # ---------- Binaire (packer) ---------- def eco_seq_packer(sq): seq_bytes = sq['sequence'].encode('ascii') # gz compress try: comp = gzip.compress(seq_bytes, 9) except AttributeError: import zlib comp = zlib.compress(seq_bytes, 9) idb = sq['id'].encode('ascii')[:20] idb += b' ' * (20 - len(idb)) defb = sq['definition'].encode('utf-8') fields = ( 4 + 4 + 20 + 4 + 4 + 4 + len(defb) + len(comp), sq['taxid'] or 0, idb, len(defb), len(seq_bytes), len(comp), defb, comp ) fmt = '>I I 20s I I I %ds %ds' % (len(defb), len(comp)) return struct.pack(fmt, *fields) def eco_tax_packer(tx): name = tx[3].encode('utf-8') return struct.pack('>I I I I I %ds' % len(name), 4 + 4 + 4 + 4 + 4 + len(name), tx[0], tx[1], tx[2], len(name), name) def eco_rank_packer(rank): b = rank.encode('utf-8') return struct.pack('>I %ds' % len(b), len(b), b) def eco_name_packer(name): nm = name[0].encode('utf-8') cl = name[1].encode('utf-8') return struct.pack('>I I I I I %ds %ds' % (len(nm), len(cl)), 4 + 4 + 4 + 4 + 4 + len(nm) + len(cl), int(name[1] == 'scientific name'), len(nm), len(cl), name[2], nm, cl) # ---------- Writers ---------- def eco_seq_writer(outfile, infile, tax_index, parser): out = open(outfile, 'wb') inp = universal_open(infile) size = file_size(inp) entries = parser(inp) count = 0 skipped = [] out.write(struct.pack('>I', 0)) progress_bar(0, size, reset=True) for e in entries: if e['taxid'] not in (None,): ti = tax_index.get(e['taxid']) if ti is not None: out.write(eco_seq_packer(e)) count += 1 else: skipped.append(e['id']) else: skipped.append(e['id']) loc = universal_tell(inp) progress_bar(loc, size) sys.stderr.write('\n') out.seek(0) out.write(struct.pack('>I', count)) out.close() return skipped def eco_tax_writer(outfile, taxonomy): out = open(outfile, 'wb') out.write(struct.pack('>I', len(taxonomy))) for tx in taxonomy: out.write(eco_tax_packer(tx)) out.close() def eco_rank_writer(outfile, ranks): out = open(outfile, 'wb') out.write(struct.pack('>I', len(ranks))) for r in sorted(ranks.keys()): out.write(eco_rank_packer(r)) out.close() def eco_name_writer(outfile, names): out = open(outfile, 'wb') out.write(struct.pack('>I', len(names))) for nm in sorted(names, key=lambda x: x[0].upper()): out.write(eco_name_packer(nm)) out.close() def eco_db_writer(prefix, taxonomy, seq_files, parser): eco_rank_writer(prefix + '.rdx', taxonomy[1]) eco_tax_writer(prefix + '.tdx', taxonomy[0]) eco_name_writer(prefix + '.ndx', taxonomy[2]) for i, fn in enumerate(seq_files, 1): sk = eco_seq_writer('%s_%03d.sdx' % (prefix, i), fn, taxonomy[3], parser) if sk: print("Skipped entries:", sk, file=sys.stderr) # ---------- Taxonomy dump ---------- def end_less(it): lst = list(it) for x in lst: yield x while True: yield lst[-1] class ColumnFile(object): def __init__(self, stream, sep=None, strip=True, types=None): if isinstance(stream, basestring): self.stream = open(stream) else: try: iter(stream) self.stream = stream except: raise ValueError("stream doit être string ou itérable") self.sep = sep self.strip = strip self.types = types if types: self.types = list(types) for i, t in enumerate(self.types): if t is bool: self.types[i] = ColumnFile.str2bool @staticmethod def str2bool(x): return x.strip()[0].upper() in ('T','V') def __iter__(self): return self def next(self): return self.__next__() def __next__(self): line = next(self.stream) parts = line.split(self.sep) if self.strip or self.types: parts = [p.strip() for p in parts] if self.types: out = [] types_it = end_less(self.types) for part in parts: typ = next(types_it) out.append(typ(part)) return out return parts def taxon_cmp(a, b): return (a[0] > b[0]) - (a[0] < b[0]) def bsearch_taxon(arr, tid): lo, hi = 0, len(arr) while lo < hi: mid = (lo + hi) // 2 if arr[mid][0] < tid: lo = mid + 1 else: hi = mid if lo < len(arr) and arr[lo][0] == tid: return lo return None def read_node_table(fn): fh = universal_open(fn) cf = ColumnFile(fh, sep='|', strip=True, types=(int, int, str, str, str, bool, int, bool, int, bool, bool, bool, str)) sys.stderr.write("Read taxonomy nodes...\n") tax = [[n[0], n[2], n[1]] for n in cf] ranks = sorted(set(x[1] for x in tax)) rankmap = {r: i for i, r in enumerate(ranks)} tax.sort(key=lambda x: x[0]) idx = {t[0]: bsearch_taxon(tax, t[0]) for t in tax} for t in tax: t[1] = rankmap[t[1]] t[2] = idx.get(t[2]) return tax, rankmap, idx def name_iter(fn): fh = universal_open(fn) cf = ColumnFile(fh, sep='|', strip=True, types=(int, str, str, str)) for tid, name, uniq, cname in cf: yield tid, name, cname def merged_iter(fn): fh = universal_open(fn) cf = ColumnFile(fh, sep='|', strip=True, types=(int, int, str)) for tid, cur, _ in cf: yield tid, cur def deleted_iter(fn): fh = universal_open(fn) cf = ColumnFile(fh, sep='|', strip=True, types=(int, str)) for tid, _ in cf: yield tid def read_taxonomy_dump(taxdir): tree, rankmap, idx = read_node_table(os.path.join(taxdir, 'nodes.dmp')) sys.stderr.write("Reading names...\n") alt = [] for tid, name, cname in name_iter(os.path.join(taxdir, 'names.dmp')): alt.append((name, cname, idx.get(tid))) if cname == 'scientific name': tree[idx[tid]].append(name) sys.stderr.write("Merged...\n") for tid, cur in merged_iter(os.path.join(taxdir, 'merged.dmp')): idx[tid] = idx.get(cur) sys.stderr.write("Deleted...\n") for tid in deleted_iter(os.path.join(taxdir, 'delnodes.dmp')): idx[tid] = None return tree, rankmap, alt, idx # ---------- Options ---------- def parse_opts(args): opts = { 'prefix': 'ecodb', 'taxdir': 'taxdump', 'parser': sequence_iterator(genbank_parser, entry_iterator) } flags, strs = getopt.getopt(args, 'ht:T:n:gfe', ['help', 'taxonomy=', 'taxonomy_db=', 'name=', 'genbank', 'fasta', 'embl']) for o, v in flags: if o in ('-h', '--help'): print_help(); sys.exit() elif o in ('-t', '--taxonomy'): opts['taxdir'] = v elif o in ('-n', '--name'): opts['prefix'] = v elif o in ('-g', '--genbank'): opts['parser'] = sequence_iterator(genbank_parser, entry_iterator) elif o in ('-f', '--fasta'): opts['parser'] = sequence_iterator(parse_fasta, fasta_iterator) elif o in ('-e', '--embl'): opts['parser'] = sequence_iterator(embl_parser, entry_iterator) else: raise ValueError("Unknown opt %s" % o) return opts, strs def print_help(): h = """ecoPCRFormat.py [-g|-f|-e] [-t taxdump_dir] [-n prefix] ... -g / --genbank : parser GenBank -f / --fasta : parser FASTA -e / --embl : parser EMBL -t / --taxonomy : dir taxdump (nodes.dmp etc.) -n / --name : prefix de sortie""" print(h) if __name__ == '__main__': opts, seqs = parse_opts(sys.argv[1:]) tax = read_taxonomy_dump(opts['taxdir']) eco_db_writer(opts['prefix'], tax, seqs, opts['parser'])