#cython: language_level=3 from obitools3.dms.column import typed_column __OBIDMS_COLUMN_CLASS__ = {} from ..capi.obitypes cimport name_data_type, \ obitype_t, \ OBI_BOOL from ..capi.obidmscolumn cimport OBIDMS_column_header_p, \ obi_close_column, \ obi_get_elements_names from ..capi.obiutils cimport obi_format_date from ..capi.obiview cimport obi_view_add_column, \ obi_view_get_pointer_on_column_in_view, \ Obiview_p from ..object cimport OBIObjectClosedInstance from obitools3.utils cimport tobytes, \ bytes2str, \ str2bytes from obitools3.dms.column import typed_column import importlib import inspect import pkgutil cdef class Column(OBIWrapper) : ''' The obitools3.dms.column.Column class wraps a C instance of a column in the context of a View ''' cdef inline OBIDMS_column_p pointer(self) : return ((self._pointer))[0] @staticmethod cdef type get_column_class(obitype_t obitype, bint multi_elts): ''' Internal function returning the python class representing a column for a given obitype. ''' return __OBIDMS_COLUMN_CLASS__[(obitype, multi_elts)][0] @staticmethod cdef type get_python_type(obitype_t obitype, bint multi_elts): # TODO ''' Internal function returning the python type representing an instance for a given obitype. ''' return __OBIDMS_COLUMN_CLASS__[(obitype, multi_elts)][1] @staticmethod def new_column(View view, object column_name, obitype_t data_type, index_t nb_elements_per_line=1, list elements_names=None, object comments=b"", object alias=b""): # TODO indexer_name? cdef bytes column_name_b = tobytes(column_name) cdef bytes alias_b = tobytes(alias) cdef bytes comments_b = tobytes(comments) cdef bytes elements_names_b cdef char* elements_names_p if not view.active() : raise OBIObjectClosedInstance() if alias_b == b"" : alias_b = column_name_b if elements_names is not None: elements_names_b = b';'.join([tobytes(x) for x in elements_names]) elements_names_p = elements_names_b else: elements_names_p = NULL if (obi_view_add_column(view = view.pointer(), column_name = column_name_b, version_number = -1, alias = alias_b, data_type = data_type, nb_lines = len(view), nb_elements_per_line = nb_elements_per_line, elements_names = elements_names_p, indexer_name = NULL, associated_column_name = NULL, associated_column_version = -1, comments = comments_b, create = True)<0): raise RuntimeError("Cannot create column %s in view %s" % (bytes2str(column_name_b), bytes2str(view.name))) return Column.open(view, alias_b) @staticmethod def open(View view, object column_name): cdef bytes column_name_b = tobytes(column_name) cdef OBIDMS_column_p* column_pp cdef OBIDMS_column_p column_p cdef Column column cdef obitype_t column_type cdef type column_class if not view.active() : raise OBIObjectClosedInstance() column_pp = obi_view_get_pointer_on_column_in_view(view.pointer(), column_name_b) if column_pp == NULL: raise KeyError("Cannot access to column %s in view %s" % ( bytes2str(column_name_b), bytes2str(view.name) )) column_p = column_pp[0] column_type = column_p.header.returned_data_type column_class = Column.get_column_class(column_type, (column_p.header.nb_elements_per_line > 1)) column = OBIWrapper.new(column_class, column_pp) column._view = view column._alias = column_name_b view.register(column) return column def add_to_view(self, View view, object column_name=None) : cdef bytes alias cdef OBIDMS_column_p column_p = self.pointer() if not view.active() : raise OBIObjectClosedInstance() if (column_name is None): alias = self._alias else: alias = tobytes(column_name) if (obi_view_add_column(view = view.pointer(), column_name = column_p.header.name, version_number = column_p.header.version, alias = alias, data_type = 0, nb_lines = -1, nb_elements_per_line = -1, elements_names = NULL, indexer_name = NULL, associated_column_name = NULL, associated_column_version = -1, comments = NULL, create = False) < 0): raise RuntimeError("Cannot insert column %s (%s@%d) into view %s" % ( bytes2str(alias), bytes2str(column_p.header.name), column_p.header.version, bytes2str(view.name) )) view.register(self) def __len__(self): ''' implements the len() function for the Column class @rtype: `int` ''' return self.lines_used def __sizeof__(self): ''' returns the size of the C object wrapped by the Column instance ''' cdef OBIDMS_column_header_p header = self.pointer().header return header.header_size + header.data_size def __iter__(self): cdef index_t line_nb for line_nb in range(self.lines_used): yield self[line_nb] def __setitem__(self, index_t line_nb, object value): self.set_line(line_nb, value) def __getitem__(self, index_t line_nb): return self.get_line(line_nb) def __str__(self) : cdef str to_print cdef Column_line line to_print = '' for line in self : to_print = to_print + str(line) + "\n" return to_print def __repr__(self) : cdef bytes s s = self._alias + b", original name: " + self.original_name + b", version " + str2bytes(str(self.version)) + b", data type: " + self.data_type return bytes2str(s) # TODO can't return bytes cpdef close(self): # TODO discuss, can't be called bc then bug when closing view that tries to close it in C cdef OBIDMS_column_p pointer if self.active() : pointer = self.pointer() self._view.unregister(self) OBIWrapper.close(self) #if obi_close_column(pointer) < 0 : # raise Exception("Problem closing column %s" % bytes2str(self.name)) # Column alias property getter and setter @property def name(self): return self._alias @name.setter def name(self, new_alias): # @DuplicatedSignature self._view.rename_column(self._alias, new_alias) # elements_names property getter @property def elements_names(self): return obi_get_elements_names(self.pointer()).split(b';') # nb_elements_per_line property getter @property def nb_elements_per_line(self): return self.pointer().header.nb_elements_per_line # data_type property getter @property def data_type(self): return name_data_type(self.pointer().header.returned_data_type) # original_name property getter @property def original_name(self): return self.pointer().header.name # version property getter @property def version(self): return self.pointer().header.version # lines_used property getter @property def lines_used(self): return self.pointer().header.lines_used # comments property getter @property def comments(self): return self.pointer().header.comments # creation_date property getter @property def creation_date(self): return obi_format_date(self.pointer().header.creation_date) ###################################################################################################### cdef class Column_multi_elts(Column) : def __getitem__(self, index_t line_nb): return Column_line(self, line_nb) cpdef set_line(self, index_t line_nb, dict values): for element_name in values : self.set_item(line_nb, element_name, values[element_name]) ###################################################################################################### cdef class Column_line : def __init__(self, Column column, index_t line_nb) : self._index = line_nb self._column = column def __getitem__(self, object elt_id) : return self._column.get_item(self._index, elt_id) def __setitem__(self, object elt_id, object value): self._column.set_item(self._index, elt_id, value) def __contains__(self, str element_name): return (element_name in self._column.elements_names) def __repr__(self) : return str(self._column.get_line(self._index)) cpdef update(self, data): # TODO ????? if isinstance(data, dict): data=data.items() for key,value in data: if key in self: self[key]=value ###################################################################################################### cdef register_column_class(obitype_t obitype, bint multi_elts, type obiclass, type python_type): ''' Each sub class of `OBIDMS_column` needs to be registered after its declaration to declare its relationship with an `OBIType_t` ''' global __OBIDMS_COLUMN_CLASS__ assert issubclass(obiclass, Column) __OBIDMS_COLUMN_CLASS__[(obitype, multi_elts)] = (obiclass, python_type) cdef register_all_column_classes() : x = list(pkgutil.walk_packages(typed_column.__path__, prefix="obitools3.dms.column.typed_column.")) all_modules = [importlib.import_module(a[1]) for a in x] for mod in all_modules : getattr(mod, 'register_class')() register_all_column_classes()