#cython: language_level=3 from obitools3.utils cimport bytes2str, str2bytes from .capi.obidms cimport obi_dms, \ obi_close_dms from .capi.obidmscolumn cimport obi_close_column, \ OBIDMS_column_p, \ OBIDMS_column_header_p from .capi.obiutils cimport obi_format_date from .capi.obitypes cimport const_char_p, \ OBIType_t, \ OBI_INT, \ OBI_FLOAT, \ OBI_BOOL, \ OBI_CHAR, \ OBI_STR, \ OBI_SEQ, \ name_data_type, \ only_ATGC # discuss from ._obidms cimport OBIDMS, \ OBIDMS_column, \ OBIView, \ OBIView_line from ._obitaxo cimport OBI_Taxonomy from ._obiseq cimport OBI_Nuc_Seq, OBI_Nuc_Seq_Stored from ._obidmscolumn_int cimport OBIDMS_column_int, \ OBIDMS_column_multi_elts_int from ._obidmscolumn_float cimport OBIDMS_column_float, \ OBIDMS_column_multi_elts_float from ._obidmscolumn_bool cimport OBIDMS_column_bool, \ OBIDMS_column_multi_elts_bool from ._obidmscolumn_char cimport OBIDMS_column_char, \ OBIDMS_column_multi_elts_char from ._obidmscolumn_str cimport OBIDMS_column_str, \ OBIDMS_column_multi_elts_str from ._obidmscolumn_seq cimport OBIDMS_column_seq, \ OBIDMS_column_multi_elts_seq from .capi.obiview cimport Obiview_p, \ Obiviews_infos_all_p, \ Obiview_infos_p, \ Column_reference_p, \ obi_new_view_nuc_seqs, \ obi_new_view, \ obi_new_view_cloned_from_name, \ obi_new_view_nuc_seqs_cloned_from_name, \ obi_open_view, \ obi_read_view_infos, \ obi_close_view_infos, \ obi_view_delete_column, \ obi_view_add_column, \ obi_view_get_column, \ obi_view_get_column, \ obi_view_get_pointer_on_column_in_view, \ obi_select_line, \ obi_select_lines, \ obi_save_and_close_view, \ VIEW_TYPE_NUC_SEQS, \ NUC_SEQUENCE_COLUMN, \ ID_COLUMN, \ DEFINITION_COLUMN from libc.stdlib cimport malloc from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer cdef class OBIDMS_column : # Should only be initialized through a subclass def __init__(self, OBIView view, str column_name): cdef OBIDMS_column_p column_p cdef OBIDMS_column_p* column_pp column_pp = obi_view_get_pointer_on_column_in_view(view.pointer, str2bytes(column_name)) column_p = column_pp[0] # TODO ugly cython dereferencing but can't find better # Fill structure self.pointer = column_pp self.dms = view.dms self.view = view self.data_type = bytes2str(name_data_type((column_p.header).returned_data_type)) self.column_name = bytes2str((column_p.header).name) self.nb_elements_per_line = (column_p.header).nb_elements_per_line self.elements_names = (bytes2str((column_p.header).elements_names)).split(';') def __setitem__(self, index_t line_nb, object value): self.set_line(line_nb, value) def __getitem__(self, index_t line_nb): return self.get_line(line_nb) def __len__(self): return (self.pointer)[0].header.lines_used def __sizeof__(self): return ((self.pointer)[0].header.header_size + (self.pointer)[0].header.data_size) def __iter__(self): # Declarations cdef index_t lines_used cdef index_t line_nb # Yield each line lines_used = (self.pointer)[0].header.lines_used for line_nb in range(lines_used): yield self.get_line(line_nb) cpdef update_pointer(self): self.pointer = obi_view_get_pointer_on_column_in_view(self.view.pointer, str2bytes(self.column_name)) cpdef list get_elements_names(self): return self.elements_names cpdef str get_data_type(self): return self.data_type cpdef index_t get_nb_lines_used(self): return (self.pointer)[0].header.lines_used cpdef str get_creation_date(self): return bytes2str(obi_format_date((self.pointer)[0].header.creation_date)) cpdef str get_comments(self): return bytes2str((self.pointer)[0].header.comments) def __str__(self) : cdef str to_print to_print = '' for line in self : to_print = to_print + str(line) + "\n" return to_print def __repr__(self) : return (self.column_name + ", version " + str((self.pointer)[0].header.version) + ", data type: " + self.data_type) cpdef close(self): if obi_close_column((self.pointer)[0]) < 0 : raise Exception("Problem closing a column") @staticmethod cdef object get_subclass_type(OBIDMS_column_p column_p) : cdef object subclass cdef OBIDMS_column_header_p header cdef OBIType_t col_type cdef bint col_writable cdef bint col_one_element_per_line header = column_p.header col_type = header.returned_data_type col_writable = column_p.writable col_one_element_per_line = ((header.nb_elements_per_line) == 1) if col_type == OBI_INT : if col_one_element_per_line : subclass = OBIDMS_column_int else : subclass = OBIDMS_column_multi_elts_int elif col_type == OBI_FLOAT : if col_one_element_per_line : subclass = OBIDMS_column_float else : subclass = OBIDMS_column_multi_elts_float elif col_type == OBI_BOOL : if col_one_element_per_line : subclass = OBIDMS_column_bool else : subclass = OBIDMS_column_multi_elts_bool elif col_type == OBI_CHAR : if col_one_element_per_line : subclass = OBIDMS_column_char else : subclass = OBIDMS_column_multi_elts_char elif col_type == OBI_STR : if col_one_element_per_line : subclass = OBIDMS_column_str else : subclass = OBIDMS_column_multi_elts_str elif col_type == OBI_SEQ : if col_one_element_per_line : subclass = OBIDMS_column_seq else : subclass = OBIDMS_column_multi_elts_seq else : raise Exception("Problem with the data type") return subclass ###################################################################################################### cdef class OBIDMS_column_multi_elts(OBIDMS_column) : def __getitem__(self, index_t line_nb): return OBIDMS_column_line(self, line_nb) cpdef set_line(self, index_t line_nb, dict values): for element_name in values : self.set_item(line_nb, element_name, values[element_name]) ###################################################################################################### cdef class OBIDMS_column_line : def __init__(self, OBIDMS_column column, index_t line_nb) : self.index = line_nb self.column = column def __getitem__(self, str element_name) : return self.column.get_item(self.index, element_name) def __setitem__(self, str element_name, object value): self.column.set_item(self.index, element_name, value) def __contains__(self, str element_name): return (element_name in self.column.elements_names) def __repr__(self) : return str(self.column.get_line(self.index)) ########################################## cdef class OBIView : def __init__(self, OBIDMS dms, str view_name, bint new=False, object view_to_clone=None, list line_selection=None, str comments=""): cdef Obiview_p view = NULL cdef int i cdef list col_list cdef str col_name cdef OBIDMS_column column cdef OBIDMS_column_p column_p cdef OBIDMS_column_header_p header cdef index_t* line_selection_p self.dms = dms # Create the C array for the line selection if needed if line_selection is not None : line_selection_p = malloc((len(line_selection) + 1) * sizeof(index_t)) for i in range(len(line_selection)) : line_selection_p[i] = line_selection[i] line_selection_p[len(line_selection)] = -1 else : line_selection_p = NULL # Create the view if needed if new : if view_to_clone is not None : if type(view_to_clone) == str : view = obi_new_view_cloned_from_name(dms.pointer, str2bytes(view_name), str2bytes(view_to_clone), line_selection_p, str2bytes(comments)) else : view = obi_new_view(dms.pointer, str2bytes(view_name), ( view_to_clone).pointer, line_selection_p, str2bytes(comments)) elif view_to_clone is None : view = obi_new_view(dms.pointer, str2bytes(view_name), NULL, line_selection_p, str2bytes(comments)) # Else, open the existing view elif not new : if view_name is not None : view = obi_open_view(dms.pointer, str2bytes(view_name)) elif view_name is None : view = obi_open_view(dms.pointer, NULL) # TODO discuss if view == NULL : raise Exception("Error creating/opening a view") self.pointer = view self.name = bytes2str(view.name) # Go through columns to build list of corresponding python instances self.columns = {} for i in range(view.column_count) : column_p = (view.columns)[i] header = (column_p).header col_name = bytes2str(header.name) subclass = OBIDMS_column.get_subclass_type(column_p) self.columns[col_name] = subclass(self, col_name) def __repr__(self) : cdef str s s = str(self.name) + ", " + str(self.comments) + ", " + str(self.pointer.line_count) + " lines\n" for column_name in self.columns : s = s + self.columns[column_name].__repr__() + '\n' return s cpdef delete_column(self, str column_name) : cdef int i cdef Obiview_p view cdef OBIDMS_column column cdef OBIDMS_column_p column_p cdef OBIDMS_column_header_p header cdef str column_n view = self.pointer if obi_view_delete_column(view, str2bytes(column_name)) < 0 : raise Exception("Problem deleting a column from a view") # Update the dictionaries of column pointers and column objects, and update pointers in column objects (make function?): (self.columns).pop(column_name) for column_n in self.columns : (self.columns[column_n]).update_pointer() cpdef add_column(self, str column_name, obiversion_t version_number=-1, str type='', index_t nb_lines=0, index_t nb_elements_per_line=1, list elements_names=None, str indexer_name="", str comments="", bint create=True ) : cdef bytes column_name_b cdef bytes elements_names_b cdef object subclass cdef OBIDMS_column_p column_p column_name_b = str2bytes(column_name) if nb_elements_per_line > 1 : elements_names_b = str2bytes(';'.join(elements_names)) elif nb_elements_per_line == 1 : elements_names_b = column_name_b if type : if type == 'OBI_INT' : data_type = OBI_INT elif type == 'OBI_FLOAT' : data_type = OBI_FLOAT elif type == 'OBI_BOOL' : data_type = OBI_BOOL elif type == 'OBI_CHAR' : data_type = OBI_CHAR elif type == 'OBI_STR' : data_type = OBI_STR elif type == 'OBI_SEQ' : data_type = OBI_SEQ else : raise Exception("Invalid provided data type") if (obi_view_add_column(self.pointer, column_name_b, version_number, # TODO should return pointer on column? data_type, nb_lines, nb_elements_per_line, elements_names_b, str2bytes(indexer_name), str2bytes(comments), create) < 0) : raise Exception("Problem adding a column in a view") # Get the column pointer column_p = obi_view_get_column(self.pointer, column_name_b) # Open and store the subclass subclass = OBIDMS_column.get_subclass_type(column_p) (self.columns)[column_name] = subclass(self, column_name) cpdef save_and_close(self) : if (obi_save_and_close_view(self.pointer) < 0) : raise Exception("Problem closing a view") def __iter__(self): # iter on each line of all columns # Declarations cdef index_t lines_used cdef index_t line_nb cdef OBIView_line line # TODO Check that this works for NUC SEQ views # Yield each line lines_used = (self.pointer).line_count for line_nb in range(lines_used) : line = self[line_nb] yield line def __getitem__(self, object item) : if type(item) == str : return (self.columns)[item] elif type(item) == int : return OBIView_line(self, item) cpdef select_line(self, index_t line_nb) : if obi_select_line(self.pointer, line_nb) < 0 : raise Exception("Problem selecting a line") cpdef select_lines(self, list line_selection) : cdef index_t* line_selection_p line_selection_p = malloc((len(line_selection) + 1) * sizeof(index_t)) for i in range(len(line_selection)) : line_selection_p[i] = line_selection[i] line_selection_p[len(line_selection)] = -1 if obi_select_lines(self.pointer, line_selection_p) < 0 : raise Exception("Problem selecting a list of lines") def __contains__(self, str column_name): return (column_name in self.columns) def __str__(self) : cdef OBIView_line line cdef str to_print to_print = "" for line in self.__iter__() : to_print = to_print + str(line) + "\n" return to_print ############################################# cdef class OBIView_NUC_SEQS(OBIView): def __init__(self, OBIDMS dms, str view_name, bint new=False, object view_to_clone=None, list line_selection=None, str comments=""): cdef Obiview_p view = NULL cdef int i cdef list col_list cdef str col_name cdef OBIDMS_column column cdef OBIDMS_column_p column_p cdef OBIDMS_column_header_p header cdef index_t* line_selection_p self.dms = dms if line_selection is not None : line_selection_p = malloc((len(line_selection) + 1) * sizeof(index_t)) for i in range(len(line_selection)) : line_selection_p[i] = line_selection[i] line_selection_p[len(line_selection)] = -1 else : line_selection_p = NULL if new : if view_to_clone is not None : if type(view_to_clone) == str : view = obi_new_view_nuc_seqs_cloned_from_name(dms.pointer, str2bytes(view_name), str2bytes(view_to_clone), line_selection_p, str2bytes(comments)) else : view = obi_new_view_nuc_seqs(dms.pointer, str2bytes(view_name), ( view_to_clone).pointer, line_selection_p, str2bytes(comments)) elif view_to_clone is None : view = obi_new_view_nuc_seqs(dms.pointer, str2bytes(view_name), NULL, line_selection_p, str2bytes(comments)) elif not new : if view_name is not None : view = obi_open_view(dms.pointer, str2bytes(view_name)) elif view_name is None : view = obi_open_view(dms.pointer, NULL) if view == NULL : raise Exception("Error creating/opening view") self.pointer = view self.name = bytes2str(view.name) self.comments = bytes2str(view.comments) # Go through columns to build list of corresponding python instances self.columns = {} for i in range(view.column_count) : column_p = (view.columns)[i] header = (column_p).header col_name = bytes2str(header.name) subclass = OBIDMS_column.get_subclass_type(column_p) self.columns[col_name] = subclass(self, col_name) self.ids = self.columns[bytes2str(ID_COLUMN)] self.sequences = self.columns[bytes2str(NUC_SEQUENCE_COLUMN)] self.definitions = self.columns[bytes2str(DEFINITION_COLUMN)] cpdef delete_column(self, str column_name) : cdef int i cdef Obiview_p view_p cdef OBIDMS_column column cdef OBIDMS_column_p column_p cdef OBIDMS_column_header_p header cdef str column_n view_p = self.pointer if obi_view_delete_column(view_p, str2bytes(column_name)) < 0 : raise Exception("Problem deleting a column from a view") # Remove instance from the dictionary (self.columns).pop(column_name) for column_n in self.columns : (self.columns[column_n]).update_pointer() def __getitem__(self, object item) : if type(item) == str : return (self.columns)[item] elif type(item) == int : return OBI_Nuc_Seq_Stored(self, item) def __setitem__(self, index_t line_idx, OBI_Nuc_Seq sequence_obj) : for key in sequence_obj : self[line_idx][key] = sequence_obj[key] ############################################# cdef class OBIView_line : def __init__(self, OBIView view, index_t line_nb) : self.index = line_nb self.view = view def __getitem__(self, str column_name) : return ((self.view).columns)[column_name][self.index] def __setitem__(self, str column_name, object value): # TODO detect multiple elements (dict type)? put somewhere else? but more risky (in get) cdef type value_type cdef str value_obitype if column_name not in self.view : if value == None : raise Exception("Trying to create a column from a None value (can't guess type)") value_type = type(value) if value_type == int : value_obitype = 'OBI_INT' elif value_type == float : value_obitype = 'OBI_FLOAT' elif value_type == bool : value_obitype = 'OBI_BOOL' elif value_type == str : if only_ATGC(str2bytes(value)) : # TODO detect IUPAC? value_obitype = 'OBI_SEQ' elif len(value) == 1 : value_obitype = 'OBI_CHAR' elif (len(value) > 1) : value_obitype = 'OBI_STR' else : raise Exception("Could not guess the type of a value to create a new column") self.view.add_column(column_name, type=value_obitype) (((self.view).columns)[column_name]).set_line(self.index, value) def __contains__(self, str column_name): return (column_name in self.view) def __repr__(self): cdef dict line cdef str column_name line = {} for column_name in self.view.columns : line[column_name] = self[column_name] return str(line) ########################################## cdef class OBIDMS : def __init__(self, str dms_name) : # Declarations cdef bytes dms_name_b # Format the character string to send to C function dms_name_b = str2bytes(dms_name) # Fill structure and create or open the DMS self.dms_name = dms_name self.pointer = obi_dms( dms_name_b) if self.pointer == NULL : raise Exception("Failed opening or creating an OBIDMS") cpdef close(self) : if (obi_close_dms(self.pointer)) < 0 : raise Exception("Problem closing an OBIDMS") cpdef OBI_Taxonomy open_taxonomy(self, str taxo_name) : return OBI_Taxonomy(self, taxo_name) cpdef OBIView open_view(self, str view_name) : cdef object view_class cdef dict view_infos view_infos = self.read_view_infos(view_name) if view_infos["view_type"] == bytes2str(VIEW_TYPE_NUC_SEQS) : view_class = OBIView_NUC_SEQS else : view_class = OBIView return view_class(self, view_name) cpdef OBIView new_view(self, str view_name, object view_to_clone=None, list line_selection=None, str view_type=None, str comments="") : cdef object view_class if view_type is not None : if view_type == bytes2str(VIEW_TYPE_NUC_SEQS) : view_class = OBIView_NUC_SEQS else : view_class = OBIView return view_class(self, view_name, new=True, view_to_clone=view_to_clone, line_selection=line_selection, comments=comments) cpdef dict read_view_infos(self, str view_name) : all_views = self.read_views() return all_views[view_name] cpdef dict read_views(self) : # TODO function that prints the dic nicely and function that prints 1 view nicely. Add column type in col ref cdef Obiviews_infos_all_p all_views_p cdef Obiview_infos_p view_p cdef Column_reference_p column_refs cdef int nb_views cdef int i, j cdef str view_name cdef str column_name cdef dict views cdef bytes name_b views = {} all_views_p = obi_read_view_infos(self.pointer) if all_views_p == NULL : raise Exception("No views to read") nb_views = (all_views_p.header).view_count for i in range(nb_views) : view_p = ( (all_views_p.view_infos)) + i view_name = bytes2str(view_p.name) views[view_name] = {} views[view_name]["comments"] = bytes2str(view_p.comments) views[view_name]["view_type"] = bytes2str(view_p.view_type) views[view_name]["column_count"] = view_p.column_count views[view_name]["line_count"] = view_p.line_count views[view_name]["view_number"] = view_p.view_number views[view_name]["created_from"] = bytes2str(view_p.created_from) views[view_name]["creation_date"] = bytes2str(obi_format_date(view_p.creation_date)) if (view_p.all_lines) : views[view_name]["line_selection"] = None else : views[view_name]["line_selection"] = {} views[view_name]["line_selection"]["column_name"] = bytes2str((view_p.line_selection).column_name) views[view_name]["line_selection"]["version"] = (view_p.line_selection).version views[view_name]["column_references"] = {} column_refs = view_p.column_references for j in range(views[view_name]["column_count"]) : column_name = bytes2str((column_refs[j]).column_name) views[view_name]["column_references"][column_name] = {} views[view_name]["column_references"][column_name]["version"] = column_refs[j].version obi_close_view_infos(all_views_p); return views