Files
obitools3/python/obitools3/obidms/_obiview.pyx
2017-02-17 15:14:06 +01:00

427 lines
14 KiB
Cython

#cython: language_level=3
from obitools3.utils cimport bytes2str, str2bytes
from .capi.obitypes cimport OBI_INT, \
OBI_FLOAT, \
OBI_BOOL, \
OBI_CHAR, \
OBI_QUAL, \
OBI_STR, \
OBI_SEQ, \
only_ATGC
from .capi.obiview cimport Obiview_p, \
obi_new_view, \
obi_open_view, \
obi_view_delete_column, \
obi_view_add_column, \
obi_view_create_column_alias, \
obi_view_get_column, \
obi_view_get_pointer_on_column_in_view, \
obi_save_and_close_view
from .capi.obidmscolumn cimport OBIDMS_column_p
from ._obidms cimport OBIDMS, \
OBIDMS_column
from libc.stdlib cimport malloc
cdef class OBIView :
def __init__(self,int __internalCall__):
if __internalCall__!=987654:
raise RuntimeError('OBIView constructor cannot be called directly')
self._pointer = NULL
self._columns = {}
cdef __init_columns__(self):
cdef size_t i
cdef str col_alias
cdef OBIDMS_column_p column_p
cdef Obiview_p pointer = self._pointer
self._columns = {}
for i in range(pointer.infos.column_count) :
col_alias = bytes2str(pointer.infos.column_references[i].alias)
column_p = <OBIDMS_column_p> (pointer.columns)[i]
subclass = OBIDMS_column.get_subclass_type(column_p)
self._columns[col_alias] = subclass(self, col_alias)
cpdef OBIView clone(self,
str view_name,
str comments=""):
cdef OBIView view = OBIView(987654)
view._pointer = obi_new_view(self._pointer.dms,
str2bytes(view_name),
self._pointer,
NULL,
str2bytes(comments))
if view._pointer == NULL :
raise RuntimeError("Error : Cannot clone view %s into view %s"
% (str(self.name),
view_name)
)
view.__init_columns__()
return view
@staticmethod
cdef OBIView new(OBIDMS dms,
str view_name,
str comments=""):
cdef OBIView view = OBIView(987654) # @DuplicatedSignature
view._pointer = obi_new_view(dms._pointer,
str2bytes(view_name),
NULL,
NULL,
str2bytes(comments))
if view._pointer == NULL :
raise RuntimeError("Error : Cannot create view %s" % view_name)
view.__init_columns__()
return view
@staticmethod
cdef OBIView open(OBIDMS dms,
str view_name):
cdef OBIView view = OBIView(987654) # @DuplicatedSignature
view._pointer = obi_open_view(dms._pointer,
str2bytes(view_name))
if view._pointer == NULL :
raise RuntimeError("Error : Cannot open view %s" % view_name)
view.__init_columns__()
return view
def __dealloc__(self):
if (obi_save_and_close_view(self._pointer) < 0) :
raise Exception("Problem closing a view")
def __repr__(self) :
cdef str s
s = str(self.name) + "\n" + str(self.comments) + "\n" + str(self.line_count) + " lines\n"
for column_name in self._columns :
s = s + repr(self._columns[column_name]) + '\n'
return s
cpdef delete_column(self, str column_name) :
cdef str column_n
if obi_view_delete_column(self._pointer, str2bytes(column_name)) < 0 :
raise Exception("Problem deleting a column from a view")
# Update the dictionary of column objects:
(self._columns).pop(column_name)
self.update_column_pointers()
cpdef add_column(self,
str column_name,
obiversion_t version_number=-1,
str alias='',
str type='',
index_t nb_lines=0,
index_t nb_elements_per_line=1,
list elements_names=None,
str indexer_name="",
str associated_column_name="",
obiversion_t associated_column_version=-1,
str comments="",
bint create=True
) :
cdef bytes column_name_b
cdef bytes elements_names_b
cdef object subclass
cdef OBIDMS_column_p column_p
column_name_b = str2bytes(column_name)
if alias == '' :
alias = column_name
alias_b = column_name_b
else :
alias_b = str2bytes(alias)
if elements_names is None :
elements_names_b = str2bytes("")
else :
elements_names_b = str2bytes(';'.join(elements_names))
if type : # TODO make C function that does that
if type == 'OBI_INT' :
data_type = OBI_INT
elif type == 'OBI_FLOAT' :
data_type = OBI_FLOAT
elif type == 'OBI_BOOL' :
data_type = OBI_BOOL
elif type == 'OBI_CHAR' :
data_type = OBI_CHAR
elif type == 'OBI_QUAL' :
data_type = OBI_QUAL
elif type == 'OBI_STR' :
data_type = OBI_STR
elif type == 'OBI_SEQ' :
data_type = OBI_SEQ
else :
raise Exception("Invalid provided data type")
if (obi_view_add_column(self._pointer, column_name_b, version_number, alias_b,
data_type, nb_lines, nb_elements_per_line,
elements_names_b, str2bytes(indexer_name),
str2bytes(associated_column_name), associated_column_version,
str2bytes(comments), create) < 0) :
raise Exception("Problem adding a column in a view")
# Get the column pointer
column_p = obi_view_get_column(self._pointer, alias_b)
# Open and store the subclass
subclass = OBIDMS_column.get_subclass_type(column_p)
(self._columns)[alias] = subclass(self, alias)
cpdef change_column_alias(self, str current_alias, str new_alias):
cdef OBIDMS_column column
if (obi_view_create_column_alias(self._pointer, str2bytes(current_alias), str2bytes(new_alias)) < 0) :
raise Exception("Problem changing a column alias")
# Update the dictionaries of column objects
self._columns[new_alias] = self._columns[current_alias]
column = self._columns[new_alias]
column._alias = new_alias
(self._columns).pop(current_alias)
cpdef update_column_pointers(self):
cdef str column_n
cdef OBIDMS_column column
for column_n in self._columns :
column = self._columns[column_n]
column._pointer = <OBIDMS_column_p*> obi_view_get_pointer_on_column_in_view(self._pointer, str2bytes(column_n))
cpdef OBIView_line_selection new_selection(self,list lines=None):
return OBIView_line_selection(self,lines)
def __iter__(self):
# Iteration on each line of all columns
# Declarations
cdef index_t line_nb
cdef OBIView_line line
# Yield each line
for line_nb in range(self.line_count) :
line = self[line_nb]
yield line
def __getitem__(self, object item) :
if type(item) == str :
return (self._columns)[item]
elif type(item) == int :
return OBIView_line(self, item)
def __contains__(self, str column_name):
return (column_name in self._columns)
def __len__(self):
return(self.line_count)
def __str__(self) :
cdef OBIView_line line
cdef str to_print
to_print = ""
for line in self :
to_print = to_print + str(line) + "\n"
return to_print
# line_count property getter
@property
def line_count(self):
return self._pointer.infos.line_count
# name property getter
@property
def name(self):
return bytes2str(self._pointer.infos.name)
# view type property getter
@property
def type(self): # @ReservedAssignment
return bytes2str(self._pointer.infos.view_type)
# columns property getter
@property
def columns(self):
return self._columns
# comments property getter
@property
def comments(self):
return bytes2str(self._pointer.infos.comments)
# TODO setter that concatenates new comments?
cdef class OBIView_line :
def __init__(self, OBIView view, index_t line_nb) :
self._index = line_nb
self._view = view
def __getitem__(self, str column_name) :
return ((self._view)._columns)[column_name][self._index]
def __setitem__(self, str column_name, object value):
# TODO detect multiple elements (dict type)? put somewhere else? but more risky (in get)
# TODO OBI_QUAL ?
cdef type value_type
cdef str value_obitype
cdef bytes value_b
if column_name not in self._view :
if value == None :
raise Exception("Trying to create a column from a None value (can't guess type)")
value_type = type(value)
if value_type == int :
value_obitype = 'OBI_INT'
elif value_type == float :
value_obitype = 'OBI_FLOAT'
elif value_type == bool :
value_obitype = 'OBI_BOOL'
elif value_type == str or value_type == bytes :
if value_type == str :
value_b = str2bytes(value)
else :
value_b = value
if only_ATGC(value_b) : # TODO detect IUPAC
value_obitype = 'OBI_SEQ'
elif len(value) == 1 :
value_obitype = 'OBI_CHAR'
elif (len(value) > 1) :
value_obitype = 'OBI_STR'
else :
raise Exception("Could not guess the type of a value to create a new column")
self._view.add_column(column_name, type=value_obitype)
(((self._view)._columns)[column_name]).set_line(self._index, value)
def __iter__(self):
for column_name in ((self._view)._columns) :
yield column_name
def __contains__(self, str column_name):
return (column_name in self._view._columns)
def __repr__(self):
cdef dict line
cdef str column_name
line = {}
for column_name in self._view._columns :
line[column_name] = self[column_name]
return str(line)
cdef class OBIView_line_selection(list):
def __init__(self, OBIView view, lines=None) :
if view._pointer == NULL:
raise Exception("Error: trying to create a line selection with an invalidated view")
self._view = view
self._view_name = view.name
if lines is not None:
self.extend(lines)
def extend(self, iterable):
cdef index_t i
cdef index_t max_i = self._view.line_count
for i in iterable:
if i > max_i:
raise RuntimeError("Error: trying to select line %d beyond the line count %d of view %s" %
(i,
max_i,
self._view_name)
)
list.append(self,i)
def append(self, index_t idx) :
if idx >= self._view.line_count :
raise RuntimeError("Error: trying to select line %d beyond the line count %d of view %s" %
(idx,
self._view.line_count,
self._view_name)
)
list.append(self,idx)
cdef index_t* __build_binary_list__(self):
cdef index_t* line_selection_p = NULL
cdef int i
cdef size_t len_selection = len(self)
line_selection_p = <index_t*> malloc((len_selection + 1) * sizeof(index_t)) # +1 for the -1 flagging the end of the array
for i in range(len_selection) :
line_selection_p[i] = self[i]
line_selection_p[len_selection] = -1 # flagging the end of the array
return line_selection_p
cpdef OBIView materialize(self,
str view_name,
str comments=""):
cdef OBIView view = OBIView(987654)
view._pointer = obi_new_view(self._view._pointer.dms,
str2bytes(view_name),
self._view._pointer,
self.__build_binary_list__(),
str2bytes(comments))
if view._pointer == NULL :
raise RuntimeError("Error: Cannot clone view %s into view %s"
% (str(self.name),
view_name)
)
view.__init_columns__()
return view