diff --git a/python/obitools3/dms/obiseq.pxd b/python/obitools3/dms/obiseq.pxd index b1bd163..3c6a4de 100644 --- a/python/obitools3/dms/obiseq.pxd +++ b/python/obitools3/dms/obiseq.pxd @@ -4,14 +4,19 @@ from .view.view cimport Line cdef class Seq(dict) : - cdef dict _annotations + + cpdef object clone(self) + cpdef str get_str(self) + cpdef get_symbol_at(self, int pos) cdef class Nuc_Seq(Seq) : - cdef char* _reverse_complement - cdef object _quality_array - cpdef bytes build_reverse_complement(self) + cdef Nuc_Seq _reverse_complement + cdef object _quality_array + cpdef set_quality(self, object new_quality, int offset=*) + cpdef object build_quality_array(self, list quality) + cpdef bytes build_reverse_complement(self) cdef class Seq_Stored(Line) : @@ -20,7 +25,13 @@ cdef class Seq_Stored(Line) : cdef class Nuc_Seq_Stored(Seq_Stored) : - cdef char* _reverse_complement + cdef Nuc_Seq _reverse_complement + cdef object _quality_array + cdef bytes _seq + cpdef set(self, object id, object seq, object definition=*, object quality=*, int offset=*, object tags=*) + cpdef get_symbol_at(self, int pos) cpdef set_quality_int(self, list new_qual) - cpdef set_quality_char(self, bytes new_qual, int offset=*) - cpdef bytes build_reverse_complement(self) \ No newline at end of file + cpdef set_quality_char(self, object new_qual, int offset=*) + cpdef object build_quality_array(self, list quality) + cpdef bytes build_reverse_complement(self) + cpdef str get_str(self) \ No newline at end of file diff --git a/python/obitools3/dms/obiseq.pyx b/python/obitools3/dms/obiseq.pyx index 1d5984e..e7e7d92 100644 --- a/python/obitools3/dms/obiseq.pyx +++ b/python/obitools3/dms/obiseq.pyx @@ -16,23 +16,33 @@ from .capi.obiutils cimport reverse_complement_sequence from cpython cimport array import array - from copy import deepcopy +import math + + +SEQUENCE_COLUMN = b"SEQ" + +SPECIAL_COLUMNS = [NUC_SEQUENCE_COLUMN, SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN] + cdef class Seq(dict) : - def __init__(self, object id, object seq, object definition=None, object quality=None, int offset=-1, object tags=None) : + def __init__(self, object id, object seq, object definition=None, object tags=None) : cdef object k cdef bytes k_b self[ID_COLUMN] = tobytes(id) - self.seq = tobytes(seq) + self[SEQUENCE_COLUMN] = tobytes(seq) if definition is not None : self.definition = tobytes(definition) - self._annotations = {} if tags is not None : for k in tags: k_b = tobytes(k) - self[k_b] = tags[k] + if k_b not in SPECIAL_COLUMNS: + # TODO discuss convert value to bytes if str + if type(tags[k]) == str: + self[k_b] = str2bytes(tags[k]) + else: + self[k_b] = tags[k] @staticmethod def new_from_stored(Seq_Stored seq_to_clone) : @@ -42,10 +52,34 @@ cdef class Seq(dict) : def __contains__(self, object key): return dict.__contains__(self, tobytes(key)) - + + def __getitem__(self, object ref): + if type(ref) == int: + return self.get_symbol_at(ref) + else: + return super(Seq, self).__getitem__(tobytes(ref)) + + def __setitem__(self, object ref, object item): + super(Seq, self).__setitem__(tobytes(ref), item) + def __str__(self): - return tostr(self[NUC_SEQUENCE_COLUMN]) + return self.get_str() + + cpdef str get_str(self): + return tostr(self.seq) + def __len__(self): + return len(self.seq) + + cpdef object clone(self): + cdef object seq_class + seq_class = type(self) + seq = seq_class(self.id, self.seq, definition=self.definition, quality=self.quality, tags=self) + return seq + + cpdef get_symbol_at(self, int pos): + return self.seq[pos:pos+1] + # sequence id property getter and setter @property def id(self): # @ReservedAssignment @@ -54,24 +88,15 @@ cdef class Seq(dict) : @id.setter def id(self, object new_id): # @ReservedAssignment @DuplicatedSignature self[ID_COLUMN] = tobytes(new_id) - - # sequence annotations property getter and setter - @property - def annotations(self): # @ReservedAssignment - return self._annotations - - @annotations.setter - def annotations(self, object annotations): # @ReservedAssignment @DuplicatedSignature - self._annotations = annotations # sequence property getter and setter @property def seq(self): - return self[b"SEQ"] + return self[SEQUENCE_COLUMN] @seq.setter def seq(self, object new_seq): # @DuplicatedSignature - self[b"SEQ"] = tobytes(new_seq) + self[SEQUENCE_COLUMN] = tobytes(new_seq) # sequence definition property getter and setter @property @@ -87,21 +112,26 @@ cdef class Nuc_Seq(Seq) : def __init__(self, object id, object seq, object definition=None, object quality=None, int offset=-1, object tags=None) : cdef object k - cdef bytes k_b + cdef bytes k_b + cdef int q + cdef list q_proba_list + self[ID_COLUMN] = tobytes(id) - self.seq = tobytes(seq) + self[NUC_SEQUENCE_COLUMN] = tobytes(seq) if definition is not None : self.definition = tobytes(definition) if quality is not None: - self.quality = quality - if type(quality) == list: # TODO discuss - self._quality_array = array.array('d', quality) - self._annotations = {} - self._reverse_complement = NULL # TODO unnecessary? + self.set_quality(quality, offset=offset) + if tags is not None: for k in tags: k_b = tobytes(k) - self[k_b] = tags[k] + if k_b not in SPECIAL_COLUMNS: + # TODO discuss convert value to bytes if str + if type(tags[k]) == str: + self[k_b] = str2bytes(tags[k]) + else: + self[k_b] = tags[k] @staticmethod def new_from_stored(Nuc_Seq_Stored seq_to_clone) : @@ -123,47 +153,55 @@ cdef class Nuc_Seq(Seq) : def quality(self): return self[QUALITY_COLUMN] - @quality.setter - def quality(self, object new_quality): # @DuplicatedSignature - self[QUALITY_COLUMN] = new_quality + cpdef set_quality(self, object new_quality, int offset=-1): + cdef list quality_int + if type(new_quality) == list: + quality_int = new_quality + elif type(new_quality) == str or type(new_quality) == bytes: + quality_int = [] + for pos in range(len(new_quality)): + quality_int.append(ord(new_quality[pos:pos+1])-offset) + else: + raise Exception("Sequence quality in unrecognized format:", type(new_quality)) + self[QUALITY_COLUMN] = quality_int # sequence quality array property getter and setter @property def quality_array(self): + if self._quality_array is None: + self._quality_array = self.build_quality_array(self[QUALITY_COLUMN]) return self._quality_array + cpdef object build_quality_array(self, list quality): + cdef int q + cdef list q_proba_list + cdef object qual_array + q_proba_list = [(10.**(-q/10.)) for q in quality] + return array.array('d', q_proba_list) + # reverse complement property getter @property def reverse_complement(self): cdef bytes rev_comp - if self._reverse_complement == NULL: + cdef object seq_class + if self._reverse_complement is None: rev_comp = self.build_reverse_complement() - self._reverse_complement = rev_comp + if QUALITY_COLUMN in self: + reversed_quality = self.quality[::-1] + else: + reversed_quality = None + seq = Nuc_Seq(self.id, rev_comp, definition=self.definition, quality=reversed_quality, tags=self) + self._reverse_complement = seq return self._reverse_complement - - def __str__(self): - return tostr(self[NUC_SEQUENCE_COLUMN]) - + cpdef bytes build_reverse_complement(self) : cdef bytes rev_comp rev_comp = deepcopy(self[NUC_SEQUENCE_COLUMN]).lower() - return reverse_complement_sequence(rev_comp) - - -# TODO DISCUSS - # sequence quality (character string format) property getter and setter -# @property -# def quality_str(self): -# return self[QUALITY_COLUMN] -# -# @quality.setter -# def quality_str(self, list new_quality): # @DuplicatedSignature -# self[QUALITY_COLUMN] = new_quality - + return reverse_complement_sequence(rev_comp) cdef class Seq_Stored(Line) : - + # sequence id property getter and setter @property def id(self): # @ReservedAssignment @DuplicatedSignature @@ -181,29 +219,79 @@ cdef class Seq_Stored(Line) : @definition.setter def definition(self, object new_def): # @DuplicatedSignature self._view.get_column(DEFINITION_COLUMN).set_line(self._index, tobytes(new_def)) - + cdef class Nuc_Seq_Stored(Seq_Stored) : - def __init__(self, View view, index_t line_nb) : - self = Seq_Stored.__init__(self, view, line_nb) - self._reverse_complement = NULL # TODO unnecessary? +# def __init__(self, View view, index_t line_nb) : +# self = Seq_Stored.__init__(self, view, line_nb) + #self._reverse_complement = None # TODO unnecessary? - cpdef set_quality_int(self, list new_qual): - self._view.get_column(QUALITY_COLUMN).set_line(self.index, new_qual) +# def __setitem__(self, object column_name, object value): +# try: +# super(Seq, self).__setitem__(column_name, value) +# except CantGuessTypeException: +# column_name_b = tobytes(column_name) +# if column_name_b in SPECIAL_COLUMNS: +# self[column_name_b] = tags[column_name] + + cpdef set(self, object id, object seq, object definition=None, object quality=None, int offset=-1, object tags=None): + cdef object k + cdef bytes k_b + cdef int q + cdef list q_proba_list + + self[ID_COLUMN] = tobytes(id) + self[NUC_SEQUENCE_COLUMN] = tobytes(seq) + if definition is not None : + self.definition = tobytes(definition) + if quality is not None: + if type(quality) == list: + self.set_quality_int(quality) + elif type(quality) == str or type(quality) == bytes: + self.set_quality_char(quality, offset=offset) + else: + raise Exception("Sequence quality in unrecognized format") + + if tags is not None: + for k in tags: + k_b = tobytes(k) + if k_b not in SPECIAL_COLUMNS: + # TODO discuss convert value to bytes if str + if type(tags[k]) == str: + self[k_b] = str2bytes(tags[k]) + else: + self[k_b] = tags[k] + + def __getitem__(self, object ref): + if type(ref) == int: + return self.get_symbol_at(ref) + else: + return super(Nuc_Seq_Stored, self).__getitem__(ref) + + cpdef get_symbol_at(self, int pos): + return self.seq[pos:pos+1] - cpdef set_quality_char(self, bytes new_qual, int offset=-1): - self._view.get_column(QUALITY_COLUMN).set_str_line(self.index, new_qual, offset=offset) - # seq property getter and setter @property def seq(self): - return self._view.get_column(NUC_SEQUENCE_COLUMN).get_line(self.index) + if not self._view.read_only : + return self._view.get_column(NUC_SEQUENCE_COLUMN).get_line(self.index) + else: + if self._seq is None: + self._seq = self._view.get_column(NUC_SEQUENCE_COLUMN).get_line(self.index) + return self._seq @seq.setter def seq(self, object new_seq): # @DuplicatedSignature self._view.get_column(NUC_SEQUENCE_COLUMN).set_line(self.index, tobytes(new_seq)) - + + cpdef set_quality_int(self, list new_qual): + self._view.get_column(QUALITY_COLUMN).set_line(self.index, new_qual) + + cpdef set_quality_char(self, object new_qual, int offset=-1): + self._view.get_column(QUALITY_COLUMN).set_str_line(self.index, tobytes(new_qual), offset=offset) + # quality property getter and setter @property def quality(self): @@ -211,39 +299,64 @@ cdef class Nuc_Seq_Stored(Seq_Stored) : @quality.setter def quality(self, object new_qual): # @DuplicatedSignature - if (new_qual is None) or (type(new_qual) == list) : # TODO check that quality column exists - self._view.get_column(QUALITY_COLUMN).set_line(self._index, new_qual) + if (new_qual is None) or (type(new_qual) == list) : # TODO check that quality column exists + self.set_quality_int(new_qual) + # WARNING: default offset used elif (type(new_qual) == str) or (type(new_qual) == bytes) : # Quality is in str form - self._view.get_column(QUALITY_COLUMN).set_str_line(self._index, tobytes(new_qual)) + self.set_quality_char(new_qual) else : raise Exception("Sequence quality in unrecognized format") - + # quality character string property getter + # WARNING: default offset used @property def quality_str(self): return self._view.get_column(QUALITY_COLUMN).get_str_line(self._index) + # sequence quality array property getter and setter + @property + def quality_array(self): + if self._quality_array is None: + self._quality_array = self.build_quality_array(self._view.get_column(QUALITY_COLUMN).get_line(self.index)) + return self._quality_array + + cpdef object build_quality_array(self, list quality): + cdef int q + cdef list q_proba_list + cdef object qual_array + q_proba_list = [(10.**(-q/10.)) for q in quality] + return array.array('d', q_proba_list) + # reverse complement property getter @property def reverse_complement(self): cdef bytes rev_comp - if self._reverse_complement == NULL: + cdef object seq_class + if self._reverse_complement is None: rev_comp = self.build_reverse_complement() - self._reverse_complement = rev_comp + if QUALITY_COLUMN in self: + reversed_quality = self.quality[::-1] + else: + reversed_quality = None + seq = Nuc_Seq(self.id, rev_comp, definition=self.definition, quality=reversed_quality, tags=self) + self._reverse_complement = seq return self._reverse_complement + cpdef bytes build_reverse_complement(self) : + cdef bytes rev_comp + rev_comp = deepcopy(self[NUC_SEQUENCE_COLUMN]).lower() + return reverse_complement_sequence(rev_comp) + def __str__(self): + return self.get_str() + + cpdef str get_str(self): return tostr(self._view.get_column(NUC_SEQUENCE_COLUMN).get_line(self.index)) def __len__(self): return len(self._view.get_column(NUC_SEQUENCE_COLUMN).get_line(self.index)) - cpdef bytes build_reverse_complement(self) : - cdef bytes rev_comp - rev_comp = deepcopy(self[NUC_SEQUENCE_COLUMN]).lower() - return reverse_complement_sequence(rev_comp) - # TODO static method to import OBI_Nuc_Seq to OBI_Nuc_Seq_Stored ? \ No newline at end of file