Cython: fasta and fastq header formatter
This commit is contained in:
@ -1,6 +1,6 @@
|
|||||||
cdef class HeaderFormat:
|
cdef class HeaderFormat:
|
||||||
|
|
||||||
cdef str start
|
cdef bytes start
|
||||||
cdef set tags
|
cdef set tags
|
||||||
cdef bint printNaKeys
|
cdef bint printNaKeys
|
||||||
cdef size_t headerBufferLength
|
cdef size_t headerBufferLength
|
||||||
|
@ -1,11 +1,22 @@
|
|||||||
|
#cython: language_level=3
|
||||||
|
|
||||||
|
from obitools3.dms.capi.obiview cimport NUC_SEQUENCE_COLUMN, \
|
||||||
|
ID_COLUMN, \
|
||||||
|
DEFINITION_COLUMN, \
|
||||||
|
QUALITY_COLUMN, \
|
||||||
|
COUNT_COLUMN
|
||||||
|
|
||||||
|
from obitools3.utils cimport str2bytes
|
||||||
|
|
||||||
|
|
||||||
cdef class HeaderFormat:
|
cdef class HeaderFormat:
|
||||||
|
|
||||||
def __init__(self, bint fastaHeader=True, list tags=[], bint printNAKeys=False):
|
SPECIAL_KEYS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN]
|
||||||
|
|
||||||
|
def __init__(self, str format="fasta", list tags=[], bint printNAKeys=False):
|
||||||
'''
|
'''
|
||||||
|
@param format:
|
||||||
@param fastaHeader:
|
@type format: `str`
|
||||||
@type fastaHeader: `bool`
|
|
||||||
|
|
||||||
@param tags:
|
@param tags:
|
||||||
@type tags: `list` of `bytes`
|
@type tags: `list` of `bytes`
|
||||||
@ -17,44 +28,42 @@ cdef class HeaderFormat:
|
|||||||
self.tags = set(tags)
|
self.tags = set(tags)
|
||||||
self.printNaKeys = printNAKeys
|
self.printNaKeys = printNAKeys
|
||||||
|
|
||||||
if fastaHeader:
|
if format=="fasta":
|
||||||
self.start=">"
|
self.start=b">"
|
||||||
else:
|
elif format=="fastq":
|
||||||
self.start="@"
|
self.start=b"@"
|
||||||
|
|
||||||
self.headerBufferLength = 1000
|
self.headerBufferLength = 1000
|
||||||
#self.headerBuffer = []
|
|
||||||
|
|
||||||
def __call__(self, dict data):
|
def __call__(self, object data):
|
||||||
cdef str header
|
cdef bytes header
|
||||||
cdef dict tags = data['tags']
|
cdef list tags = [key for key in data if key not in self.SPECIAL_KEYS]
|
||||||
cdef set ktags
|
cdef set ktags
|
||||||
cdef list lines = [""]
|
cdef list lines = [b""]
|
||||||
cdef str tagline
|
cdef bytes tagline
|
||||||
|
|
||||||
|
|
||||||
if self.tags is not None and self.tags:
|
if self.tags is not None and self.tags:
|
||||||
ktags = self.tags
|
ktags = self.tags
|
||||||
else:
|
else:
|
||||||
ktags = set(tags.keys())
|
ktags = set(tags)
|
||||||
|
|
||||||
for k in ktags:
|
for k in ktags:
|
||||||
if k in tags:
|
if k in tags:
|
||||||
value = tags[k]
|
value = data[k]
|
||||||
if value is not None or self.printNaKeys:
|
if value is not None or self.printNaKeys:
|
||||||
lines.append("%s=%s;" % (k,tags[k]))
|
lines.append(k + b"=" + str2bytes(str(data[k]))) #TODO bytes() method on values (str equivalent)
|
||||||
|
|
||||||
if len(lines) > 1:
|
if len(lines) > 1:
|
||||||
tagline=" ".join(lines)
|
tagline=b" ".join(lines)
|
||||||
else:
|
else:
|
||||||
tagline=""
|
tagline=b""
|
||||||
|
|
||||||
if data['definition'] is not None:
|
if data[DEFINITION_COLUMN] is not None:
|
||||||
header = "%s%s%s %s" % (self.start,data['id'],
|
header = self.start + data[ID_COLUMN] + tagline + b" " + data[DEFINITION_COLUMN]
|
||||||
tagline,
|
|
||||||
data['definition'])
|
|
||||||
else:
|
else:
|
||||||
header = "%s%s%s" % (self.start,data['id'],
|
header = self.start + data[ID_COLUMN] + tagline
|
||||||
tagline)
|
|
||||||
|
|
||||||
return header
|
return header
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user