diff --git a/python/obitools3/format/header.pxd b/python/obitools3/format/header.pxd index 8e407e7..e08ed9f 100644 --- a/python/obitools3/format/header.pxd +++ b/python/obitools3/format/header.pxd @@ -1,6 +1,6 @@ cdef class HeaderFormat: - cdef str start + cdef bytes start cdef set tags cdef bint printNaKeys cdef size_t headerBufferLength diff --git a/python/obitools3/format/header.pyx b/python/obitools3/format/header.pyx index 229f723..1fcf237 100644 --- a/python/obitools3/format/header.pyx +++ b/python/obitools3/format/header.pyx @@ -1,11 +1,22 @@ +#cython: language_level=3 + +from obitools3.dms.capi.obiview cimport NUC_SEQUENCE_COLUMN, \ + ID_COLUMN, \ + DEFINITION_COLUMN, \ + QUALITY_COLUMN, \ + COUNT_COLUMN + +from obitools3.utils cimport str2bytes + cdef class HeaderFormat: - def __init__(self, bint fastaHeader=True, list tags=[], bint printNAKeys=False): + SPECIAL_KEYS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN] + + def __init__(self, str format="fasta", list tags=[], bint printNAKeys=False): ''' - - @param fastaHeader: - @type fastaHeader: `bool` + @param format: + @type format: `str` @param tags: @type tags: `list` of `bytes` @@ -17,44 +28,42 @@ cdef class HeaderFormat: self.tags = set(tags) self.printNaKeys = printNAKeys - if fastaHeader: - self.start=">" - else: - self.start="@" + if format=="fasta": + self.start=b">" + elif format=="fastq": + self.start=b"@" self.headerBufferLength = 1000 - #self.headerBuffer = [] + - def __call__(self, dict data): - cdef str header - cdef dict tags = data['tags'] + def __call__(self, object data): + cdef bytes header + cdef list tags = [key for key in data if key not in self.SPECIAL_KEYS] cdef set ktags - cdef list lines = [""] - cdef str tagline + cdef list lines = [b""] + cdef bytes tagline + if self.tags is not None and self.tags: ktags = self.tags else: - ktags = set(tags.keys()) + ktags = set(tags) for k in ktags: if k in tags: - value = tags[k] + value = data[k] if value is not None or self.printNaKeys: - lines.append("%s=%s;" % (k,tags[k])) + lines.append(k + b"=" + str2bytes(str(data[k]))) #TODO bytes() method on values (str equivalent) if len(lines) > 1: - tagline=" ".join(lines) + tagline=b" ".join(lines) else: - tagline="" + tagline=b"" - if data['definition'] is not None: - header = "%s%s%s %s" % (self.start,data['id'], - tagline, - data['definition']) + if data[DEFINITION_COLUMN] is not None: + header = self.start + data[ID_COLUMN] + tagline + b" " + data[DEFINITION_COLUMN] else: - header = "%s%s%s" % (self.start,data['id'], - tagline) - + header = self.start + data[ID_COLUMN] + tagline + return header