Files
obitools3/python/obitools3/dms/view/view.pyx

574 lines
19 KiB
Cython

#cython: language_level=3
cdef dict __VIEW_CLASS__= {}
from libc.stdlib cimport malloc
from ..capi.obiview cimport Alias_column_pair_p, \
obi_new_view, \
obi_open_view, \
obi_clone_view, \
obi_save_and_close_view, \
obi_view_get_pointer_on_column_in_view, \
obi_view_delete_column, \
obi_view_create_column_alias
from ..capi.obidmscolumn cimport OBIDMS_column_p
from ..capi.obidms cimport OBIDMS_p
from obitools3.utils cimport tobytes, \
str2bytes, \
bytes2str
from ..object cimport OBIObjectClosedInstance
from obitools3.dms.view import typed_view
from ..capi.obitypes cimport is_a_DNA_seq, \
OBI_VOID, \
OBI_BOOL, \
OBI_CHAR, \
OBI_FLOAT, \
OBI_INT, \
OBI_QUAL, \
OBI_SEQ, \
OBI_STR
import importlib
import inspect
import pkgutil
cdef class View(OBIWrapper) :
cdef inline Obiview_p pointer(self) :
return <Obiview_p>(self._pointer)
@staticmethod
cdef type get_view_class(bytes view_type):
global __VIEW_CLASS__
return __VIEW_CLASS__.get(view_type, View)
@staticmethod
def new(DMS dms,
object view_name,
object comments=None):
cdef bytes view_name_b = tobytes(view_name)
cdef bytes comments_b
cdef str message
cdef void* pointer
cdef View view # @DuplicatedSignature
if comments is not None:
comments_b = tobytes(comments)
else:
comments_b = b''
pointer = <void*>obi_new_view(<OBIDMS_p>dms._pointer,
view_name_b,
NULL,
NULL,
comments_b)
if pointer == NULL :
message = "Error : Cannot create view %s" % bytes2str(view_name_b)
raise RuntimeError(message)
view = OBIWrapper.new(View, pointer)
view._dms = dms
dms.register(view)
return view
def clone(self,
object view_name,
object comments=None):
cdef bytes view_name_b = tobytes(view_name)
cdef bytes comments_b
cdef void* pointer
cdef View view
if not self.active() :
raise OBIObjectClosedInstance()
if comments is not None:
comments_b = tobytes(comments)
else:
comments_b = b''
pointer = <void*> obi_clone_view(self._dms.pointer(),
self.pointer(),
view_name_b,
NULL,
comments_b)
if pointer == NULL :
raise RuntimeError("Error : Cannot clone view %s into view %s"
% (str(self.name),
bytes2str(view_name_b))
)
view = OBIWrapper.new(type(self), pointer)
view._dms = self._dms
self._dms.register(view)
return view
@staticmethod
def open(DMS dms, # @ReservedAssignment
object view_name):
cdef bytes view_name_b = tobytes(view_name)
cdef void* pointer
cdef View view
cdef type view_class
pointer = <void*> obi_open_view(dms.pointer(),
view_name_b)
if pointer == NULL :
raise RuntimeError("Error : Cannot open view %s" % bytes2str(view_name_b))
view_class = View.get_view_class((<Obiview_p>pointer).infos.view_type)
view = OBIWrapper.new(view_class, pointer)
view._dms = dms
dms.register(view)
return view
cpdef close(self):
cdef Obiview_p pointer = self.pointer()
if self.active() :
self._dms.unregister(self)
OBIWrapper.close(self)
if obi_save_and_close_view(pointer) < 0 :
raise Exception("Problem closing view %s" %
bytes2str(self.name))
def __repr__(self) :
# TODO check everywhere
if not self.active() :
raise OBIObjectClosedInstance()
cdef str s = "{name:s}\n{comments:s}\n{line_count:d} lines\n".format(name = str(self.name),
comments = str(self.comments),
line_count = self.line_count)
for column_name in self.keys() :
s = s + repr(self[column_name]) + '\n'
return s
def keys(self):
cdef str col_alias
cdef int i
cdef Obiview_p pointer = self.pointer()
cdef int nb_column = pointer.infos.column_count
cdef Alias_column_pair_p column_p = pointer.infos.column_references
if not self.active() :
raise OBIObjectClosedInstance()
for i in range(nb_column) :
col_alias = bytes2str(column_p[i].alias)
yield col_alias
def get_column(self,
object column_name):
if not self.active() :
raise OBIObjectClosedInstance()
return Column.open(self, column_name)
def get_column_with_idx(self,
int column_idx):
cdef Obiview_p pointer = self.pointer()
cdef int nb_column = pointer.infos.column_count
if not self.active() :
raise OBIObjectClosedInstance()
if column_idx > nb_column :
raise IndexError(column_idx, "No column with this index")
return Column.open(self, pointer.infos.column_references[column_idx].alias)
cpdef delete_column(self,
object column_name) :
cdef bytes column_name_b = tobytes(column_name)
if not self.active() :
raise OBIObjectClosedInstance()
# Close the cython instance first
col = self[column_name]
col.close()
# Remove the column from the view which closes the C structure
if obi_view_delete_column(self.pointer(), column_name_b) < 0 :
raise Exception("Problem deleting column %s from a view",
bytes2str(column_name_b))
cpdef rename_column(self,
object current_name,
object new_name):
cdef Column column
cdef bytes current_name_b = tobytes(current_name)
cdef bytes new_name_b = tobytes(new_name)
if not self.active() :
raise OBIObjectClosedInstance()
if (obi_view_create_column_alias(self.pointer(),
tobytes(current_name_b),
tobytes(new_name_b)) < 0) :
raise Exception("Problem in renaming column %s to %s" % (
bytes2str(current_name_b),
bytes2str(new_name_b)))
# TODO warning, not multithreading compliant
cpdef Column rewrite_column_with_diff_attributes(self,
object column_name,
obitype_t new_data_type=<obitype_t>OBI_VOID,
index_t new_nb_elements_per_line=0,
list new_elements_names=None) :
cdef Column old_column
cdef Column new_column
cdef index_t length = len(self)
old_column = self.get_column(column_name)
if new_data_type == 0 :
new_data_type = old_column.data_type
if new_nb_elements_per_line == 0 :
new_nb_elements_per_line = old_column.nb_elements_per_line
if new_elements_names is None :
new_elements_names = old_column.elements_names
new_column = Column.new_column(self, old_column.pointer().header.name, new_data_type,
nb_elements_per_line=new_nb_elements_per_line, elements_names=new_elements_names,
comments=old_column.comments, alias=tobytes(column_name)+tobytes('___new___'))
for i in range(length) :
new_column[i] = old_column[i]
# Remove old column from view
self.delete_column(column_name)
# Rename new
new_column.name = column_name
return new_column
cpdef Line_selection new_selection(self,list lines=None):
return Line_selection(self, lines)
def __iter__(self):
# Iteration on each line of all columns
# Declarations
cdef index_t line_nb
cdef Line line
# Yield each line
for line_nb in range(self.line_count) :
line = self[line_nb]
yield line
def __getitem__(self, object item) :
if type(item) == int :
return Line(self, item)
else : # TODO assume str or bytes for optimization?
return self.get_column(item) # TODO hyper lent dans la pratique
def __contains__(self, str column_name):
return (column_name in self.keys())
def __len__(self):
return(self.line_count)
def __str__(self) :
cdef Line line
cdef str to_print
to_print = ""
for line in self :
to_print = to_print + str(line) + "\n"
return to_print
@property
def dms(self):
return self._dms
# line_count property getter
@property
def line_count(self):
return self.pointer().infos.line_count
# name property getter
@property
def name(self):
return <bytes> self.pointer().infos.name
# view type property getter
@property
def type(self): # @ReservedAssignment
return <bytes> self.pointer().infos.view_type
# comments property getter
@property
def comments(self):
return <bytes> self.pointer().infos.comments
# TODO setter that concatenates new comments?
cdef class Line :
def __init__(self, View view, index_t line_nb) :
self._index = line_nb
self._view = view
def __getitem__(self, str column_name) :
return (self._view)[column_name][self._index]
def __setitem__(self, str column_name, object value): # TODO discuss
# TODO detect multiple elements (dict type)? put somewhere else? but more risky (in get)
# TODO OBI_QUAL ?
cdef type value_type
cdef obitype_t value_obitype
cdef bytes value_b
if column_name not in self._view :
if value == None :
raise Exception("Trying to create a column from a None value (can't guess type)")
value_type = type(value)
if value_type == int :
value_obitype = OBI_INT
elif value_type == float :
value_obitype = OBI_FLOAT
elif value_type == bool :
value_obitype = OBI_BOOL
elif value_type == str or value_type == bytes :
if value_type == str :
value_b = str2bytes(value)
else :
value_b = value
if is_a_DNA_seq(value_b) :
value_obitype = OBI_SEQ
elif len(value) == 1 :
value_obitype = OBI_CHAR
elif (len(value) > 1) :
value_obitype = OBI_STR
else :
raise Exception("Could not guess the type of a value to create a new column")
Column.new_column(self._view, column_name, value_obitype)
(self._view)[column_name][self._index] = value
def __iter__(self):
for column_name in (self._view).keys() :
yield self[column_name]
def __contains__(self, str column_name):
return (column_name in self._view.keys())
def __repr__(self):
cdef dict line
cdef str column_name
line = {}
for column_name in self._view.keys() :
line[column_name] = self[column_name]
return str(line)
# cpdef dict get_view_infos(self, str view_name) :
#
# cdef Obiview_infos_p view_infos_p
# cdef dict view_infos_d
# cdef Alias_column_pair_p column_refs
# cdef int i, j
# cdef str column_name
#
# view_infos_p = obi_view_map_file(self._pointer,
# tobytes(view_name))
# view_infos_d = {}
# view_infos_d["name"] = bytes2str(view_infos_p.name)
# view_infos_d["comments"] = bytes2str(view_infos_p.comments)
# view_infos_d["view_type"] = bytes2str(view_infos_p.view_type)
# view_infos_d["column_count"] = <int> view_infos_p.column_count
# view_infos_d["line_count"] = <int> view_infos_p.line_count
# view_infos_d["created_from"] = bytes2str(view_infos_p.created_from)
# view_infos_d["creation_date"] = bytes2str(obi_format_date(view_infos_p.creation_date))
# if (view_infos_p.all_lines) :
# view_infos_d["line_selection"] = None
# else :
# view_infos_d["line_selection"] = {}
# view_infos_d["line_selection"]["column_name"] = bytes2str((view_infos_p.line_selection).column_name)
# view_infos_d["line_selection"]["version"] = <int> (view_infos_p.line_selection).version
# view_infos_d["column_references"] = {}
# column_references = view_infos_p.column_references
# for j in range(view_infos_d["column_count"]) :
# column_name = bytes2str((column_references[j]).alias)
# view_infos_d["column_references"][column_name] = {}
# view_infos_d["column_references"][column_name]["original_name"] = bytes2str((column_references[j]).column_refs.column_name)
# view_infos_d["column_references"][column_name]["version"] = (column_references[j]).column_refs.version
#
# obi_view_unmap_file(self._pointer, view_infos_p)
#
# return view_infos_d
cdef class Line_selection(list):
def __init__(self, View view, lines=None) :
if view._pointer == NULL:
raise Exception("Error: trying to create a line selection with an invalidated view")
self._view = view
self._view_name = view.name
if lines is not None:
self.extend(lines)
def extend(self, iterable):
cdef index_t i
cdef index_t max_i = self._view.line_count
for i in iterable: # TODO this is already checked in C
if i > max_i:
raise RuntimeError("Error: trying to select line %d beyond the line count %d of view %s" %
(i,
max_i,
self._view_name)
)
list.append(self,i)
def append(self, index_t idx) :
if idx >= self._view.line_count :
raise IndexError("Error: trying to select line %d beyond the line count %d of view %s" %
(idx,
self._view.line_count,
bytes2str(self.name))
)
list.append(self,idx)
cdef index_t* __build_binary_list__(self):
cdef index_t* line_selection_p = NULL
cdef int i
cdef size_t l_selection = len(self)
line_selection_p = <index_t*> malloc((l_selection + 1) * sizeof(index_t)) # +1 for the -1 flagging the end of the array
for i in range(l_selection) :
line_selection_p[i] = self[i]
line_selection_p[l_selection] = -1 # flagging the end of the array
return line_selection_p
cpdef View materialize(self,
object view_name,
object comments=""):
cdef bytes view_name_b = tobytes(view_name)
cdef bytes comments_b
cdef Obiview_p pointer
cdef View view
if not self._view.active() :
raise OBIObjectClosedInstance()
if comments is not None:
comments_b = tobytes(comments)
else:
comments_b = b''
pointer = obi_clone_view(self._view._dms.pointer(),
self._view.pointer(),
view_name_b,
self.__build_binary_list__(),
comments_b)
if pointer == NULL :
raise RuntimeError("Error : Cannot clone view %s into view %s with new line selection"
% (str(self._view.name),
bytes2str(view_name_b))
)
view = OBIWrapper.new(type(self._view), pointer)
view._dms = self._view._dms
view._dms.register(view)
return view
#############################################################
cdef register_view_class(bytes view_type_name,
type view_class):
'''
Each subclass of `dms.view` needs to be registered after its declaration
'''
global __VIEW_CLASS__
assert issubclass(view_class, View)
__VIEW_CLASS__[view_type_name] = view_class
cdef register_all_view_classes() :
x = list(pkgutil.walk_packages(typed_view.__path__, prefix="obitools3.dms.view.typed_view."))
all_modules = [importlib.import_module(a[1]) for a in x]
for mod in all_modules :
getattr(mod, 'register_class')()
register_all_view_classes()