Files
obitools3/python/obitools3/uri/decode.pyx
Celine Mercier 00993d4215 Cython API: fixed a bug where the quality format would not be read
properly from the configuration values
2018-02-12 14:42:30 +01:00

407 lines
13 KiB
Cython

#cython: language_level=3
from urllib.parse import urlparse, urlunparse, parse_qs, ParseResultBytes
from os.path import isdir, isfile, basename, join
from obitools3.dms.dms import DMS
from obitools3.parsers.fasta import fastaNucIterator
from obitools3.parsers.fastq import fastqIterator
from obitools3.parsers.universal import entryIteratorFactory
from obitools3.dms.obiseq import Nuc_Seq
from obitools3.apps.config import getConfiguration,logger
from obitools3.apps.temp import get_temp_dms
class MalformedURIException(RuntimeError):
pass
cdef open_dms(bytes path, bint create=False):
"""
Opens a DMS from the path part of an URI
"""
cdef int pos=1
cdef bytes dmspath
cdef bytes dmsdirname
cdef bytes dmsname
while(pos>0):
pos = path.find(b"/",pos)
if pos>0:
dmspath=path[0:pos]
else:
dmspath=path
if not isdir(dmspath):
dmsdirname=dmspath+b".obidms"
if isdir(dmsdirname):
dmsname=basename(dmspath)
if isfile(join(dmsdirname,dmsname+b"_infos")):
dms = DMS.open(dmspath)
if pos > 0:
return(dms,path[pos+1:])
else:
return(dms,b'')
elif create:
dms=DMS.new(dmspath)
if pos > 0:
return(dms,path[pos+1:])
else:
return(dms,b'')
pos=pos+1
return None
def open_dms_element(DMS dms, bytes path,
bint create=False,
type newviewtype=View):
"""
"""
cdef list path_parts = path.split(b'/')
# The URI is only composed of a DMS
if not path:
return (dms,dms)
# The URI targets a taxonomy
# dms:dmspath/taxonomy/taxoname[/taxid]
if path_parts[0]==b"taxonomy":
if len(path_parts) > 1:
taxo = Taxonomy.open(dms,path_parts[1])
if len(path_parts) == 3:
taxon=taxo[int(path_parts[2])]
return (dms,taxon)
elif len(path_parts) > 3:
raise MalformedURIException('Malformed Taxonomy URI')
return (dms,taxo)
# The URI targets a view
# dms:dmspath/viewname[/columnname|#line|*[/#line|columnname|*[/subcolumn]]]
if create:
view = newviewtype.new(dms,path_parts[0])
else:
view = newviewtype.open(dms,path_parts[0])
if len(path_parts) > 1:
if path_parts[1]==b'*':
if len(path_parts) == 2:
return (dms,view)
else:
column = view[path_parts[2]]
if len(path_parts) == 3:
return (dms,column)
elif len(path_parts) == 4:
raise NotImplementedError()
else:
raise MalformedURIException('Malformed View * URI')
try:
part = int(path_parts[1])
except ValueError:
part = path_parts[1]
part = view[part]
else:
return (dms,view)
if len(path_parts) > 2:
if isinstance(part, Column):
if path_parts[2]==b"*":
if len(path_parts) == 4:
raise NotImplementedError()
elif len(path_parts) == 3:
return (dms,part)
else:
raise MalformedURIException('Malformed View * URI')
else:
subpart = part[int(path_parts[2])]
else:
subpart = part[path_parts[2]]
else:
return (dms,part)
if len(path_parts) > 3:
try:
subsubpart = int(path_parts[3])
except ValueError:
subsubpart = path_parts[3]
subsubpart = subpart[subsubpart]
else:
return (dms,subpart)
# URI with too many sub-parts
if len(path_parts) > 4:
raise MalformedURIException('Malformed View URI')
return (dms,subsubpart)
def open_uri(uri,
bint input=True,
type newviewtype=View):
cdef bytes urib = tobytes(uri)
cdef bytes scheme
cdef tuple dms
cdef dict qualifiers
cdef DMS default_dms
config = getConfiguration()
urip = urlparse(urib)
if 'obi' not in config:
config['obi']={}
try:
default_dms=config["obi"]["defaultdms"]
except KeyError:
default_dms=None
try:
create=(not input) and (not config["obi"]["nocreatedms"])
except KeyError:
create=not input
scheme = urip.scheme
error = None
if scheme==b"" or scheme==b"dms" :
dms = open_dms(urip.path,create)
if dms is None and default_dms is not None:
dms=(default_dms, urip.path)
if dms is not None:
try:
resource=open_dms_element(dms[0],dms[1],
create,
newviewtype
)
scheme=b"dms"
urip = ParseResultBytes(scheme=b"dms",
netloc=urip.netloc,
path=urip.path,
params=urip.params,
query=urip.query,
fragment=urip.fragment)
if default_dms is None:
config["obi"]["defaultdms"]=resource[0]
return (resource[0],
resource[1],
type(resource[1]),
urlunparse(urip))
except Exception as e:
error=e
if scheme==b"dms" :
logger('error','cannot open DMS: %s', uri)
raise FileNotFoundError('uri')
if not urip.scheme:
urib=b"file:"+urib
try:
file = uopen(tostr(urib))
logger('info','Opened file : %s', tostr(urib))
except Exception as e:
file = None
error=e
if file is not None:
qualifiers=parse_qs(urip.query)
if b'format' in qualifiers:
format = qualifiers[b'format'][0]
else:
try:
format=config["obi"]["fileformat"]
except KeyError:
format=None
if b'seqtype' in qualifiers:
seqtype=qualifiers[b'seqtype'][0]
else:
try:
seqtype=config["obi"]["seqtype"]
except KeyError:
seqtype=b'nuc'
if b'skip' in qualifiers:
skip=int(qualifiers[b"skip"][0])
else:
try:
skip=config["obi"]["skip"]
except KeyError:
skip=0
if skip < 0:
raise MalformedURIException('Malformed skip argument in URI')
if b'only' in qualifiers:
only=int(qualifiers[b"only"][0])
else:
try:
only=config["obi"]["only"]
except KeyError:
only=None
if only is not None and only <= 0:
raise MalformedURIException('Malformed only argument in URI')
if b"skiperror" in qualifiers:
try:
skiperror=eval(qualifiers[b"skiperror"][0])
except Exception as e:
raise MalformedURIException('Malformed skiperror argument in URI')
else:
try:
skiperror=config["obi"]["skiperror"]
except KeyError:
skiperror=True
if not isinstance(skiperror, bool):
raise MalformedURIException('Malformed skiperror argument in URI')
if b"noquality" in qualifiers:
try:
noquality=eval(qualifiers[b"noquality"][0])
except Exception as e:
raise MalformedURIException('Malformed noquality argument in URI')
else:
try:
noquality=config["obi"]["noquality"]
except KeyError:
noquality=False
if not isinstance(noquality, bool):
raise MalformedURIException('Malformed noquality argument in URI')
if b"qualityformat" in qualifiers:
if qualifiers[b"qualityformat"][0]=="sanger":
offset=33
elif qualifiers[b"qualityformat"][0]=="solexa":
offset=64
else:
try:
if config["obi"]["qualityformat"][0]=="sanger":
offset=33
elif config["obi"]["qualityformat"][0]=="solexa":
offset=64
#offset=config["obi"]["qualityoffset"] # TODO discuss
except KeyError:
offset=33
if b"header" in qualifiers:
try:
header=eval(qualifiers[b"header"][0])
except Exception as e:
raise MalformedURIException('Malformed header argument in URI')
else:
try:
header=config["obi"]["header"]
except KeyError:
header=False
if not isinstance(header, bool):
raise MalformedURIException('Malformed header argument in URI')
if b"sep" in qualifiers:
sep=qualifiers[b"sep"][0][0]
else:
try:
sep=config["obi"]["sep"]
except KeyError:
sep=None
# if b"quote" in qualifiers:
# pass
if b"dec" in qualifiers:
dec=qualifiers[b"dec"][0][0]
else:
try:
dec=config["obi"]["dec"]
except KeyError:
dec=b"."
if b"nastring" in qualifiers:
nastring=qualifiers[b"nastring"][0]
else:
try:
nastring=config["obi"]["nastring"]
except KeyError:
nastring=b'NA'
if b"stripwhite" in qualifiers:
try:
stripwhite=eval(qualifiers[b"stripwhite"][0])
except Exception as e:
raise MalformedURIException('Malformed stripwhite argument in URI')
else:
try:
stripwhite=config["obi"]["stripwhite"]
except KeyError:
stripwhite=True
if not isinstance(stripwhite, bool):
raise MalformedURIException('Malformed stripwhite argument in URI')
if b"blanklineskip" in qualifiers:
try:
blanklineskip=eval(qualifiers[b"blanklineskip"][0])
except Exception as e:
raise MalformedURIException('Malformed blanklineskip argument in URI')
else:
try:
blanklineskip=config["obi"]["blanklineskip"]
except KeyError:
blanklineskip=True
if not isinstance(blanklineskip, bool):
raise MalformedURIException('Malformed blanklineskip argument in URI')
if b"commentchar" in qualifiers:
commentchar=qualifiers[b"commentchar"][0][0]
else:
try:
commentchar=config["obi"]["commentchar"]
except KeyError:
commentchar=b'#'
if format is not None:
if qualifiers[b"seqtype"]==b"nuc":
objclass = Nuc_Seq
if format==b"fasta":
iseq = fastaNucIterator(file,
skip=skip,
only=only)
elif format==b"fastq":
iseq = fastqIterator(file,
skip=skip,
only=only,
offset=offset,
noquality=noquality)
else:
raise NotImplementedError('Sequence file format not implemented')
elif qualifiers[b"seqtype"]==b"prot":
raise NotImplementedError()
else:
iseq,objclass = entryIteratorFactory(file,
skip, only,
seqtype,
offset,
noquality,
skiperror,
header,
sep,
dec,
nastring,
stripwhite,
blanklineskip,
commentchar)
#tmpdms = get_temp_dms()
return (file, iseq, objclass, urib)