New command: obi annotate

This commit is contained in:
Celine Mercier
2018-03-21 16:39:31 +01:00
parent 123fb9d7ba
commit 2684535e26

View File

@ -0,0 +1,340 @@
#cython: language_level=3
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
from obitools3.dms import DMS
from obitools3.dms.view.view cimport View, Line_selection
from obitools3.uri.decode import open_uri
from obitools3.apps.optiongroups import addMinimalInputOption, addTaxonomyInputOption, addMinimalOutputOption
from obitools3.dms.view import RollbackException
from functools import reduce
from obitools3.apps.config import logger
from obitools3.utils cimport tobytes
from obitools3.dms.capi.obiview cimport QUALITY_COLUMN
import time
import math
__title__="Annotate views with new tags and edit existing annotations"
def addOptions(parser):
addMinimalInputOption(parser)
addTaxonomyInputOption(parser)
addMinimalOutputOption(parser)
group=parser.add_argument_group('obi annotate specific options')
group.add_argument('--seq-rank', # TODO seq/elt/line???
action="store_true",
dest="annotate:add_rank",
default=False,
help="Add a rank attribute to the sequence "
"indicating the sequence position in the data.")
group.add_argument('-R', '--rename-tag',
action="append",
dest="annotate:rename_tags",
metavar="<OLD_NAME:NEW_NAME>",
type=str,
default=[],
help="Change tag name from OLD_NAME to NEW_NAME.")
group.add_argument('-D', '--delete-tag',
action="append",
dest="annotate:delete_tags",
metavar="<TAG_NAME>",
type=str,
default=[],
help="Delete tag TAG_NAME.")
group.add_argument('-S', '--set-tag',
action="append",
dest="annotate:set_tags",
metavar="<TAG_NAME:PYTHON_EXPRESSION>",
type=str,
default=[],
help="Add a new tag named TAG_NAME with "
"a value computed from PYTHON_EXPRESSION.")
group.add_argument('--set-identifier',
action="store",
dest="annotate:set_identifier",
metavar="<PYTHON_EXPRESSION>",
type=str,
default=None,
help="Set sequence identifier with "
"a value computed from PYTHON_EXPRESSION.")
group.add_argument('--set-sequence',
action="store",
dest="annotate:set_sequence",
metavar="<PYTHON_EXPRESSION>",
type=str,
default=None,
help="Change the sequence itself with "
"a value computed from PYTHON_EXPRESSION.")
group.add_argument('--set-definition',
action="store",
dest="annotate:set_definition",
metavar="<PYTHON_EXPRESSION>",
type=str,
default=None,
help="Set sequence definition with "
"a value computed from PYTHON_EXPRESSION.")
group.add_argument('--run',
action="store",
dest="annotate:run",
metavar="<PYTHON_EXPRESSION>",
type=str,
default=None,
help="Run a python expression on each element.")
group.add_argument('-C', '--clear',
action="store_true",
dest="annotate:clear",
default=False,
help="Clear all tags except the obligatory ones.")
group.add_argument('-k','--keep',
action='append',
dest="annotate:keep",
metavar="<TAG>",
default=[],
type=str,
help="Only keep this tag. (Can be specified several times.)")
group.add_argument('--length',
action="store_true",
dest="annotate:length",
default=False,
help="Add 'seq_length' tag with sequence length.")
group.add_argument('--with-taxon-at-rank',
action='append',
dest="annotate:taxon_at_rank",
metavar="<RANK_NAME>",
default=[],
type=str,
help="Add taxonomy annotation at the specified rank level RANK_NAME.")
def sequenceTaggerGenerator(config, taxo=None):
toSet=None
newId=None
newDef=None
newSeq=None
length=None
add_rank=None
run=None
if 'set_tags' in config['annotate']: # TODO default option problem, to fix
toSet = [x.split(':',1) for x in config['annotate']['set_tags'] if len(x.split(':',1))==2]
if 'set_identifier' in config['annotate']:
newId = config['annotate']['set_identifier']
if 'set_definition' in config['annotate']:
newDef = config['annotate']['set_definition']
if 'set_sequence' in config['annotate']:
newSeq = config['annotate']['set_sequence']
if 'length' in config['annotate']:
length = config['annotate']['length']
if 'add_rank' in config["annotate"]:
add_rank = config["annotate"]["add_rank"]
if 'run' in config['annotate']:
run = config['annotate']['run']
counter = [0]
for i in range(len(toSet)):
for j in range(len(toSet[i])):
toSet[i][j] = tobytes(toSet[i][j])
annoteRank=[]
if 'taxon_at_rank' in config['annotate']:
if taxo is not None:
annoteRank = config['annotate']['taxon_at_rank']
else:
raise Exception("A taxonomy must be provided to annotate taxon ranks")
def sequenceTagger(seq):
if counter[0]>=0:
counter[0]+=1
for rank in annoteRank:
if 'taxid' in seq:
taxid = seq['taxid']
if taxid is not None:
rtaxid = taxo.get_taxon_at_rank(taxid, rank)
if rtaxid is not None:
scn = taxo.get_scientific_name(rtaxid)
else:
scn=None
seq[rank]=rtaxid
seq["%s_name"%rank]=scn
if add_rank:
seq['seq_rank']=counter[0]
for i,v in toSet:
#try:
if taxo is not None:
environ = {'taxonomy' : taxo, 'sequence':seq, 'counter':counter[0], 'math':math}
else:
environ = {'sequence':seq, 'counter':counter[0], 'math':math}
val = eval(v, environ, seq)
#except Exception,e: # TODO discuss usefulness of this
# if options.onlyValid:
# raise e
# val = v
seq[i]=val
if length:
seq['seq_length']=len(seq)
if newId is not None:
# try:
if taxo is not None:
environ = {'taxonomy' : taxo, 'sequence':seq, 'counter':counter[0], 'math':math}
else:
environ = {'sequence':seq, 'counter':counter[0], 'math':math}
val = eval(newId, environ, seq)
# except Exception,e:
# if options.onlyValid:
# raise e
# val = newId
seq.id=val
if newDef is not None:
# try:
if taxo is not None:
environ = {'taxonomy' : taxo, 'sequence':seq, 'counter':counter[0], 'math':math}
else:
environ = {'sequence':seq, 'counter':counter[0], 'math':math}
val = eval(newDef, environ, seq)
# except Exception,e:
# if options.onlyValid:
# raise e
# val = newDef
seq.definition=val
#
if newSeq is not None:
# try:
if taxo is not None:
environ = {'taxonomy' : taxo, 'sequence':seq, 'counter':counter[0], 'math':math}
else:
environ = {'sequence':seq, 'counter':counter[0], 'math':math}
val = eval(newSeq, environ, seq)
# except Exception,e:
# if options.onlyValid:
# raise e
# val = newSeq
seq.seq=val
if 'seq_length' in seq:
seq['seq_length']=len(seq)
# Delete quality since it must match the sequence.
# TODO discuss deleting for each sequence separately
if QUALITY_COLUMN in seq:
seq.view.delete_column(QUALITY_COLUMN)
if run is not None:
# try:
if taxo is not None:
environ = {'taxonomy' : taxo, 'sequence':seq, 'counter':counter[0], 'math':math}
else:
environ = {'sequence':seq, 'counter':counter[0], 'math':math}
eval(run, environ, seq)
# except Exception,e:
# if options.onlyValid:
# raise e
return sequenceTagger
def run(config):
DMS.obi_atexit()
logger("info", "obi annotate")
# Open the input
input = open_uri(config['obi']['inputURI'])
if input is None:
raise Exception("Could not read input view")
i_view = input[1]
# Read the name of the output view
uri = config['obi']['outputURI'].split('/')
if len(uri)==2:
# Check that input and output DMS are the same (predicate, to discuss)
if config['obi']['inputURI'].split('/')[0] != uri[0]:
raise Exception("Input and output DMS must be the same")
output_view_name = uri[1]
else:
output_view_name = uri[0]
# Clone output view from input view
o_view = i_view.clone(output_view_name, comments=i_view.comments+b"\nobi annotate") # TODO comments
if o_view is None:
raise Exception("Couldn't create output view")
if 'taxoURI' in config['obi'] : # TODO default None problem
taxo_uri = open_uri(config['obi']['taxoURI'])
if taxo_uri is None:
raise Exception("Couldn't open taxonomy")
taxo = taxo_uri[1]
else :
taxo = None
# Initialize the progress bar
pb = ProgressBar(len(o_view), config, seconde=5)
try:
# Apply editions
# Editions at view level
if 'delete_tags' in config['annotate']:
toDelete = config['annotate']['delete_tags'][:]
if 'rename_tags' in config['annotate']:
toRename = [x.split(':',1) for x in config['annotate']['rename_tags'] if len(x.split(':',1))==2]
if 'clear' in config['annotate']:
clear = config['annotate']['clear']
if 'keep' in config['annotate']:
keep = config['annotate']['keep']
for i in range(len(toDelete)):
toDelete[i] = tobytes(toDelete[i])
for i in range(len(toRename)):
for j in range(len(toRename[i])):
toRename[i][j] = tobytes(toRename[i][j])
for i in range(len(keep)):
keep[i] = tobytes(keep[i])
keep = set(keep)
if clear or keep:
for k in o_view.keys():
if k not in keep:
o_view.delete_column(k)
else:
for k in toDelete:
o_view.delete_column(k)
for old_name, new_name in toRename:
if old_name in o_view:
o_view.rename_column(old_name, new_name)
# Editions at line level
sequenceTagger = sequenceTaggerGenerator(config, taxo=taxo)
for i in range(len(o_view)):
pb(i)
sequenceTagger(o_view[i])
except Exception, e:
raise RollbackException("obi annotate error, rollbacking view: "+str(e), o_view)
print("\n")
print(repr(o_view))
input[0].close()
# output[0].close()