751 lines
29 KiB
Cython
751 lines
29 KiB
Cython
#cython: language_level=3
|
|
|
|
from obitools3.utils cimport bytes2str, str2bytes
|
|
|
|
from .capi.obidms cimport obi_dms, \
|
|
obi_close_dms
|
|
|
|
from .capi.obidmscolumn cimport obi_close_column, \
|
|
OBIDMS_column_p, \
|
|
OBIDMS_column_header_p
|
|
|
|
from .capi.obiutils cimport obi_format_date
|
|
|
|
from .capi.obitypes cimport const_char_p, \
|
|
OBIType_t, \
|
|
OBI_INT, \
|
|
OBI_FLOAT, \
|
|
OBI_BOOL, \
|
|
OBI_CHAR, \
|
|
OBI_QUAL, \
|
|
OBI_STR, \
|
|
OBI_SEQ, \
|
|
name_data_type, \
|
|
only_ATGC # discuss
|
|
|
|
from ._obidms cimport OBIDMS, \
|
|
OBIDMS_column, \
|
|
OBIView, \
|
|
OBIView_line
|
|
|
|
from ._obitaxo cimport OBI_Taxonomy
|
|
|
|
from ._obidmscolumn_int cimport OBIDMS_column_int, \
|
|
OBIDMS_column_multi_elts_int
|
|
|
|
from ._obidmscolumn_float cimport OBIDMS_column_float, \
|
|
OBIDMS_column_multi_elts_float
|
|
|
|
from ._obidmscolumn_bool cimport OBIDMS_column_bool, \
|
|
OBIDMS_column_multi_elts_bool
|
|
|
|
from ._obidmscolumn_char cimport OBIDMS_column_char, \
|
|
OBIDMS_column_multi_elts_char
|
|
|
|
from ._obidmscolumn_qual cimport OBIDMS_column_qual, \
|
|
OBIDMS_column_multi_elts_qual
|
|
|
|
from ._obidmscolumn_str cimport OBIDMS_column_str, \
|
|
OBIDMS_column_multi_elts_str
|
|
|
|
from ._obidmscolumn_seq cimport OBIDMS_column_seq, \
|
|
OBIDMS_column_multi_elts_seq
|
|
|
|
from .capi.obiview cimport Obiview_p, \
|
|
Obiview_infos_p, \
|
|
Alias_column_pair_p, \
|
|
obi_new_view_nuc_seqs, \
|
|
obi_new_view, \
|
|
obi_new_view_cloned_from_name, \
|
|
obi_new_view_nuc_seqs_cloned_from_name, \
|
|
obi_view_map_file, \
|
|
obi_view_unmap_file, \
|
|
obi_open_view, \
|
|
obi_view_delete_column, \
|
|
obi_view_add_column, \
|
|
obi_view_create_column_alias, \
|
|
obi_view_get_column, \
|
|
obi_view_get_pointer_on_column_in_view, \
|
|
obi_save_and_close_view, \
|
|
VIEW_TYPE_NUC_SEQS, \
|
|
NUC_SEQUENCE_COLUMN, \
|
|
ID_COLUMN, \
|
|
DEFINITION_COLUMN, \
|
|
QUALITY_COLUMN
|
|
|
|
from libc.stdlib cimport malloc
|
|
|
|
|
|
cdef class OBIDMS_column :
|
|
|
|
# Note: should only be initialized through a subclass
|
|
def __init__(self, OBIView view, str column_alias):
|
|
|
|
cdef OBIDMS_column_p column_p
|
|
cdef OBIDMS_column_p* column_pp
|
|
|
|
column_pp = obi_view_get_pointer_on_column_in_view(view._pointer, str2bytes(column_alias))
|
|
column_p = column_pp[0] # TODO ugly cython dereferencing but can't find better
|
|
|
|
# Fill structure
|
|
self._alias = column_alias
|
|
self._pointer = column_pp
|
|
self._view = view
|
|
|
|
def __setitem__(self, index_t line_nb, object value):
|
|
self.set_line(line_nb, value)
|
|
|
|
def __getitem__(self, index_t line_nb):
|
|
return self.get_line(line_nb)
|
|
|
|
def __len__(self):
|
|
return self.lines_used
|
|
|
|
def __sizeof__(self):
|
|
return ((self._pointer)[0].header.header_size + (self._pointer)[0].header.data_size)
|
|
|
|
def __iter__(self):
|
|
# Declarations
|
|
cdef index_t line_nb
|
|
# Yield each line
|
|
for line_nb in range(self.lines_used):
|
|
yield self.get_line(line_nb)
|
|
|
|
def __str__(self) :
|
|
cdef str to_print
|
|
to_print = ''
|
|
for line in self :
|
|
to_print = to_print + str(line) + "\n"
|
|
return to_print
|
|
|
|
def __repr__(self) :
|
|
return (self._alias + ", original name: " + self.original_name + ", version " + str(self.version) + ", data type: " + self.data_type)
|
|
|
|
cpdef close(self):
|
|
if obi_close_column((self._pointer)[0]) < 0 :
|
|
raise Exception("Problem closing a column")
|
|
|
|
# Column alias property getter and setter
|
|
@property
|
|
def alias(self):
|
|
return self._alias
|
|
@alias.setter
|
|
def alias(self, new_alias): # @DuplicatedSignature
|
|
self._view.change_column_alias(self._alias, new_alias)
|
|
|
|
# elements_names property getter
|
|
@property
|
|
def elements_names(self):
|
|
return (bytes2str(((self._pointer)[0].header).elements_names)).split(';')
|
|
|
|
# nb_elements_per_line property getter
|
|
@property
|
|
def nb_elements_per_line(self):
|
|
return ((self._pointer)[0].header).nb_elements_per_line
|
|
|
|
# data_type property getter
|
|
@property
|
|
def data_type(self):
|
|
return bytes2str(name_data_type(((self._pointer)[0].header).returned_data_type))
|
|
|
|
# original_name property getter
|
|
@property
|
|
def original_name(self):
|
|
return bytes2str(((self._pointer)[0].header).name)
|
|
|
|
# version property getter
|
|
@property
|
|
def version(self):
|
|
return ((self._pointer)[0].header).version
|
|
|
|
# lines_used property getter
|
|
@property
|
|
def lines_used(self):
|
|
return (self._pointer)[0].header.lines_used
|
|
|
|
# comments property getter
|
|
@property
|
|
def comments(self):
|
|
return bytes2str((self._pointer)[0].header.comments)
|
|
|
|
# creation_date property getter
|
|
@property
|
|
def creation_date(self):
|
|
return bytes2str(obi_format_date((self._pointer)[0].header.creation_date))
|
|
|
|
@staticmethod
|
|
cdef object get_subclass_type(OBIDMS_column_p column_p) :
|
|
|
|
cdef object subclass
|
|
cdef OBIDMS_column_header_p header
|
|
cdef OBIType_t col_type
|
|
cdef bint col_writable
|
|
cdef bint col_one_element_per_line
|
|
|
|
header = column_p.header
|
|
col_type = header.returned_data_type
|
|
col_writable = column_p.writable
|
|
col_one_element_per_line = ((header.nb_elements_per_line) == 1)
|
|
|
|
if col_type == OBI_INT :
|
|
if col_one_element_per_line :
|
|
subclass = OBIDMS_column_int
|
|
else :
|
|
subclass = OBIDMS_column_multi_elts_int
|
|
elif col_type == OBI_FLOAT :
|
|
if col_one_element_per_line :
|
|
subclass = OBIDMS_column_float
|
|
else :
|
|
subclass = OBIDMS_column_multi_elts_float
|
|
elif col_type == OBI_BOOL :
|
|
if col_one_element_per_line :
|
|
subclass = OBIDMS_column_bool
|
|
else :
|
|
subclass = OBIDMS_column_multi_elts_bool
|
|
elif col_type == OBI_CHAR :
|
|
if col_one_element_per_line :
|
|
subclass = OBIDMS_column_char
|
|
else :
|
|
subclass = OBIDMS_column_multi_elts_char
|
|
elif col_type == OBI_QUAL :
|
|
if col_one_element_per_line :
|
|
subclass = OBIDMS_column_qual
|
|
else :
|
|
subclass = OBIDMS_column_multi_elts_qual
|
|
elif col_type == OBI_STR :
|
|
if col_one_element_per_line :
|
|
subclass = OBIDMS_column_str
|
|
else :
|
|
subclass = OBIDMS_column_multi_elts_str
|
|
elif col_type == OBI_SEQ :
|
|
if col_one_element_per_line :
|
|
subclass = OBIDMS_column_seq
|
|
else :
|
|
subclass = OBIDMS_column_multi_elts_seq
|
|
else :
|
|
raise Exception("Problem with the data type")
|
|
|
|
return subclass
|
|
|
|
|
|
######################################################################################################
|
|
|
|
|
|
cdef class OBIDMS_column_multi_elts(OBIDMS_column) :
|
|
|
|
def __getitem__(self, index_t line_nb):
|
|
return OBIDMS_column_line(self, line_nb)
|
|
|
|
cpdef set_line(self, index_t line_nb, dict values):
|
|
for element_name in values :
|
|
self.set_item(line_nb, element_name, values[element_name])
|
|
|
|
|
|
######################################################################################################
|
|
|
|
|
|
cdef class OBIDMS_column_line :
|
|
|
|
def __init__(self, OBIDMS_column column, index_t line_nb) :
|
|
self._index = line_nb
|
|
self._column = column
|
|
|
|
def __getitem__(self, str element_name) :
|
|
return self._column.get_item(self._index, element_name)
|
|
|
|
def __setitem__(self, str element_name, object value):
|
|
self._column.set_item(self._index, element_name, value)
|
|
|
|
def __contains__(self, str element_name):
|
|
return (element_name in self._column.elements_names)
|
|
|
|
def __repr__(self) :
|
|
return str(self._column.get_line(self._index))
|
|
|
|
|
|
######################################################################################################
|
|
|
|
|
|
cdef class OBIView :
|
|
|
|
def __init__(self, OBIDMS dms, str view_name, bint new=False, object view_to_clone=None, OBIView_line_selection line_selection=None, str comments="", bint quality_column=False, str view_type=""):
|
|
|
|
cdef index_t* line_selection_p = NULL
|
|
cdef int i
|
|
cdef str col_alias
|
|
cdef OBIDMS_column_p column_p
|
|
cdef object subclass
|
|
|
|
if line_selection is not None :
|
|
# Get the name of the associated view to clone
|
|
view_to_clone = line_selection._view_name # TODO discuss. This makes it possible for the view to clone to be closed. If a view to clone was given it is not checked.
|
|
# Build the C array corresponding to the line selection
|
|
line_selection_p = <index_t*> malloc((len(line_selection) + 1) * sizeof(index_t)) # +1 for the -1 flagging the end of the array
|
|
for i in range(len(line_selection)) :
|
|
line_selection_p[i] = line_selection[i]
|
|
line_selection_p[len(line_selection)] = -1 # flagging the end of the array
|
|
|
|
self._pointer = self._open_or_create_view(dms, view_name, new=new, view_to_clone=view_to_clone, line_selection_p=line_selection_p, comments=comments, quality_column=quality_column, view_type=view_type)
|
|
|
|
# Go through columns to build dictionaries of corresponding python instances # TODO make function?
|
|
self._columns = {}
|
|
for i in range(self._pointer.infos.column_count) :
|
|
col_alias = bytes2str(self._pointer.infos.column_references[i].alias)
|
|
column_p = <OBIDMS_column_p> (self._pointer.columns)[i]
|
|
subclass = OBIDMS_column.get_subclass_type(column_p)
|
|
self._columns[col_alias] = subclass(self, col_alias)
|
|
|
|
|
|
cdef Obiview_p _open_or_create_view(self, OBIDMS dms, str view_name, bint new=False, object view_to_clone=None, index_t* line_selection_p=NULL, str comments="", bint quality_column=False, str view_type=""):
|
|
|
|
cdef Obiview_p view = NULL
|
|
|
|
# Create the view if needed, with the right type
|
|
if new :
|
|
if view_type == "" :
|
|
if view_to_clone is not None :
|
|
if type(view_to_clone) == str :
|
|
view = obi_new_view_cloned_from_name(dms._pointer, str2bytes(view_name), str2bytes(view_to_clone), line_selection_p, str2bytes(comments))
|
|
else :
|
|
view = obi_new_view(dms._pointer, str2bytes(view_name), (<OBIView> view_to_clone)._pointer, line_selection_p, str2bytes(comments))
|
|
else :
|
|
view = obi_new_view(dms._pointer, str2bytes(view_name), NULL, line_selection_p, str2bytes(comments))
|
|
elif view_type == bytes2str(VIEW_TYPE_NUC_SEQS) :
|
|
if view_to_clone is not None :
|
|
if type(view_to_clone) == str :
|
|
view = obi_new_view_nuc_seqs_cloned_from_name(dms._pointer, str2bytes(view_name), str2bytes(view_to_clone), line_selection_p, str2bytes(comments), quality_column)
|
|
else :
|
|
view = obi_new_view_nuc_seqs(dms._pointer, str2bytes(view_name), (<OBIView> view_to_clone)._pointer, line_selection_p, str2bytes(comments), quality_column)
|
|
else :
|
|
view = obi_new_view_nuc_seqs(dms._pointer, str2bytes(view_name), NULL, line_selection_p, str2bytes(comments), quality_column)
|
|
else :
|
|
raise Exception("View type not recognized")
|
|
|
|
# Else, open the existing view
|
|
else :
|
|
view = obi_open_view(dms._pointer, str2bytes(view_name))
|
|
|
|
if view == NULL :
|
|
raise Exception("Error creating/opening a view")
|
|
|
|
return view
|
|
|
|
|
|
def __repr__(self) :
|
|
cdef str s
|
|
s = str(self.name) + "\n" + str(self.comments) + "\n" + str(self.line_count) + " lines\n"
|
|
for column_name in self._columns :
|
|
s = s + repr(self._columns[column_name]) + '\n'
|
|
return s
|
|
|
|
|
|
cpdef delete_column(self, str column_name) :
|
|
cdef str column_n
|
|
if obi_view_delete_column(self._pointer, str2bytes(column_name)) < 0 :
|
|
raise Exception("Problem deleting a column from a view")
|
|
# Update the dictionary of column objects:
|
|
(self._columns).pop(column_name)
|
|
self.update_column_pointers()
|
|
|
|
|
|
cpdef add_column(self,
|
|
str column_name,
|
|
obiversion_t version_number=-1,
|
|
str alias='',
|
|
str type='',
|
|
index_t nb_lines=0,
|
|
index_t nb_elements_per_line=1,
|
|
list elements_names=None,
|
|
str indexer_name="",
|
|
str associated_column_name="",
|
|
obiversion_t associated_column_version=-1,
|
|
str comments="",
|
|
bint create=True
|
|
) :
|
|
|
|
cdef bytes column_name_b
|
|
cdef bytes elements_names_b
|
|
cdef object subclass
|
|
cdef OBIDMS_column_p column_p
|
|
|
|
column_name_b = str2bytes(column_name)
|
|
if alias == '' :
|
|
alias = column_name
|
|
alias_b = column_name_b
|
|
else :
|
|
alias_b = str2bytes(alias)
|
|
|
|
if elements_names is None :
|
|
elements_names_b = str2bytes("")
|
|
else :
|
|
elements_names_b = str2bytes(';'.join(elements_names))
|
|
|
|
if type : # TODO make C function that does that
|
|
if type == 'OBI_INT' :
|
|
data_type = OBI_INT
|
|
elif type == 'OBI_FLOAT' :
|
|
data_type = OBI_FLOAT
|
|
elif type == 'OBI_BOOL' :
|
|
data_type = OBI_BOOL
|
|
elif type == 'OBI_CHAR' :
|
|
data_type = OBI_CHAR
|
|
elif type == 'OBI_QUAL' :
|
|
data_type = OBI_QUAL
|
|
elif type == 'OBI_STR' :
|
|
data_type = OBI_STR
|
|
elif type == 'OBI_SEQ' :
|
|
data_type = OBI_SEQ
|
|
else :
|
|
raise Exception("Invalid provided data type")
|
|
|
|
if (obi_view_add_column(self._pointer, column_name_b, version_number, alias_b,
|
|
data_type, nb_lines, nb_elements_per_line,
|
|
elements_names_b, str2bytes(indexer_name),
|
|
str2bytes(associated_column_name), associated_column_version,
|
|
str2bytes(comments), create) < 0) :
|
|
raise Exception("Problem adding a column in a view")
|
|
|
|
# Get the column pointer
|
|
column_p = obi_view_get_column(self._pointer, alias_b)
|
|
|
|
# Open and store the subclass
|
|
subclass = OBIDMS_column.get_subclass_type(column_p)
|
|
(self._columns)[alias] = subclass(self, alias)
|
|
|
|
|
|
cpdef change_column_alias(self, str current_alias, str new_alias):
|
|
cdef OBIDMS_column column
|
|
if (obi_view_create_column_alias(self._pointer, str2bytes(current_alias), str2bytes(new_alias)) < 0) :
|
|
raise Exception("Problem changing a column alias")
|
|
# Update the dictionaries of column objects
|
|
self._columns[new_alias] = self._columns[current_alias]
|
|
column = self._columns[new_alias]
|
|
column._alias = new_alias
|
|
(self._columns).pop(current_alias)
|
|
|
|
|
|
cpdef update_column_pointers(self):
|
|
cdef str column_n
|
|
cdef OBIDMS_column column
|
|
for column_n in self._columns :
|
|
column = self._columns[column_n]
|
|
column._pointer = <OBIDMS_column_p*> obi_view_get_pointer_on_column_in_view(self._pointer, str2bytes(column_n))
|
|
|
|
|
|
cpdef close(self) :
|
|
if (obi_save_and_close_view(self._pointer) < 0) :
|
|
raise Exception("Problem closing a view")
|
|
self._pointer = NULL
|
|
self._columns = {}
|
|
|
|
|
|
def __iter__(self):
|
|
# Iteration on each line of all columns
|
|
|
|
# Declarations
|
|
cdef index_t line_nb
|
|
cdef OBIView_line line
|
|
|
|
# Yield each line
|
|
for line_nb in range(self.line_count) :
|
|
line = self[line_nb]
|
|
yield line
|
|
|
|
|
|
def __getitem__(self, object item) :
|
|
if type(item) == str :
|
|
return (self._columns)[item]
|
|
elif type(item) == int :
|
|
return OBIView_line(self, item)
|
|
|
|
|
|
def __contains__(self, str column_name):
|
|
return (column_name in self._columns)
|
|
|
|
|
|
def __len__(self):
|
|
return(self.line_count)
|
|
|
|
|
|
def __str__(self) :
|
|
cdef OBIView_line line
|
|
cdef str to_print
|
|
to_print = ""
|
|
for line in self :
|
|
to_print = to_print + str(line) + "\n"
|
|
return to_print
|
|
|
|
|
|
# line_count property getter
|
|
@property
|
|
def line_count(self):
|
|
return self._pointer.infos.line_count
|
|
|
|
# name property getter
|
|
@property
|
|
def name(self):
|
|
return bytes2str(self._pointer.infos.name)
|
|
|
|
# view type property getter
|
|
@property
|
|
def type(self): # @ReservedAssignment
|
|
return bytes2str(self._pointer.infos.view_type)
|
|
|
|
# columns property getter
|
|
@property
|
|
def columns(self):
|
|
return self._columns
|
|
|
|
# comments property getter
|
|
@property
|
|
def comments(self):
|
|
return bytes2str(self._pointer.infos.comments)
|
|
# TODO setter that concatenates new comments?
|
|
|
|
|
|
@staticmethod
|
|
cdef object get_view_subclass(str view_type) :
|
|
cdef object subclass
|
|
if view_type == bytes2str(VIEW_TYPE_NUC_SEQS) :
|
|
view_class = OBIView_NUC_SEQS
|
|
else :
|
|
view_class = OBIView
|
|
return view_class
|
|
|
|
|
|
######################################################################################################
|
|
|
|
|
|
|
|
######################################################################################################
|
|
|
|
|
|
cdef class OBIView_line :
|
|
|
|
def __init__(self, OBIView view, index_t line_nb) :
|
|
self._index = line_nb
|
|
self._view = view
|
|
|
|
def __getitem__(self, str column_name) :
|
|
return ((self._view)._columns)[column_name][self._index]
|
|
|
|
def __setitem__(self, str column_name, object value):
|
|
# TODO detect multiple elements (dict type)? put somewhere else? but more risky (in get)
|
|
# TODO OBI_QUAL ?
|
|
cdef type value_type
|
|
cdef str value_obitype
|
|
cdef bytes value_b
|
|
|
|
if column_name not in self._view :
|
|
if value == None :
|
|
raise Exception("Trying to create a column from a None value (can't guess type)")
|
|
value_type = type(value)
|
|
if value_type == int :
|
|
value_obitype = 'OBI_INT'
|
|
elif value_type == float :
|
|
value_obitype = 'OBI_FLOAT'
|
|
elif value_type == bool :
|
|
value_obitype = 'OBI_BOOL'
|
|
elif value_type == str or value_type == bytes :
|
|
if value_type == str :
|
|
value_b = str2bytes(value)
|
|
else :
|
|
value_b = value
|
|
if only_ATGC(value_b) : # TODO detect IUPAC
|
|
value_obitype = 'OBI_SEQ'
|
|
elif len(value) == 1 :
|
|
value_obitype = 'OBI_CHAR'
|
|
elif (len(value) > 1) :
|
|
value_obitype = 'OBI_STR'
|
|
else :
|
|
raise Exception("Could not guess the type of a value to create a new column")
|
|
self._view.add_column(column_name, type=value_obitype)
|
|
|
|
(((self._view)._columns)[column_name]).set_line(self._index, value)
|
|
|
|
def __iter__(self):
|
|
for column_name in ((self._view)._columns) :
|
|
yield column_name
|
|
|
|
def __contains__(self, str column_name):
|
|
return (column_name in self._view._columns)
|
|
|
|
def __repr__(self):
|
|
cdef dict line
|
|
cdef str column_name
|
|
line = {}
|
|
for column_name in self._view._columns :
|
|
line[column_name] = self[column_name]
|
|
return str(line)
|
|
|
|
|
|
######################################################################################################
|
|
|
|
|
|
cdef class OBIView_line_selection(list):
|
|
|
|
def __init__(self, OBIView view) :
|
|
if view._pointer == NULL:
|
|
raise Exception("Error: trying to create a line selection with an invalidated view")
|
|
self._view = view
|
|
self._view_name = view.name
|
|
|
|
def append(self, index_t idx) :
|
|
if idx >= self._view.line_count :
|
|
raise Exception("Error: trying to select a line beyond the line count of a view")
|
|
# if idx in self : # TODO discuss. Discuss order too
|
|
# pass
|
|
# else :
|
|
super(OBIView_line_selection, self).append(idx)
|
|
|
|
|
|
######################################################################################################
|
|
|
|
|
|
cdef class OBIDMS :
|
|
|
|
def __init__(self, str dms_name) :
|
|
|
|
# Declarations
|
|
cdef bytes dms_name_b
|
|
|
|
# Format the character string to send to C function
|
|
dms_name_b = str2bytes(dms_name)
|
|
|
|
# Fill structure and create or open the DMS
|
|
self._pointer = obi_dms(<const_char_p> dms_name_b)
|
|
if self._pointer == NULL :
|
|
raise Exception("Failed opening or creating an OBIDMS")
|
|
|
|
|
|
# name property getter
|
|
@property
|
|
def name(self):
|
|
return bytes2str(self._pointer.dms_name)
|
|
|
|
|
|
cpdef close(self) :
|
|
if (obi_close_dms(self._pointer)) < 0 :
|
|
raise Exception("Problem closing an OBIDMS")
|
|
|
|
|
|
cpdef OBI_Taxonomy open_taxonomy(self, str taxo_name) :
|
|
return OBI_Taxonomy(self, taxo_name)
|
|
|
|
|
|
cpdef OBIView open_view(self, str view_name) :
|
|
cdef object view_class
|
|
view_class = OBIView.get_view_subclass(self.read_view_infos(view_name)["view_type"])
|
|
return view_class(self, view_name)
|
|
|
|
|
|
cpdef OBIView new_view(self, str view_name, str view_type="", bint quality_column=False, str comments="") :
|
|
cdef object view_class
|
|
# Get right subclass depending on view type
|
|
view_class = OBIView.get_view_subclass(view_type)
|
|
return view_class(self, view_name, new=True, comments=comments, quality_column=quality_column, view_type=view_type)
|
|
|
|
|
|
cpdef OBIView clone_view(self, str view_name, object view_to_clone, str comments="") :
|
|
cdef object view_class # @DuplicatedSignature
|
|
cdef str view_type
|
|
# Get right subclass depending on view type
|
|
if type(view_to_clone) == str :
|
|
view_type = self.read_view_infos(view_to_clone)["view_type"]
|
|
else :
|
|
view_type = view_to_clone.type
|
|
view_class = OBIView.get_view_subclass(view_type)
|
|
return view_class(self, view_name, new=True, view_to_clone=view_to_clone, comments=comments, view_type=view_type)
|
|
|
|
|
|
cpdef OBIView clone_view_with_line_selection(self, str view_name, OBIView_line_selection line_selection, str comments="") :
|
|
cdef object view_class # @DuplicatedSignature
|
|
# Get right subclass depending on view type
|
|
view_class = OBIView.get_view_subclass(line_selection._view.type)
|
|
return view_class(self, view_name, new=True, view_to_clone=line_selection._view, line_selection=line_selection, comments=comments)
|
|
|
|
|
|
cpdef dict read_view_infos(self, str view_name) :
|
|
|
|
cdef Obiview_infos_p view_infos_p
|
|
cdef dict view_infos_d
|
|
cdef Alias_column_pair_p column_refs
|
|
cdef int i, j
|
|
cdef str column_name
|
|
|
|
view_infos_p = obi_view_map_file(self._pointer, str2bytes(view_name))
|
|
view_infos_d = {}
|
|
view_infos_d["name"] = bytes2str(view_infos_p.name)
|
|
view_infos_d["comments"] = bytes2str(view_infos_p.comments)
|
|
view_infos_d["view_type"] = bytes2str(view_infos_p.view_type)
|
|
view_infos_d["column_count"] = <int> view_infos_p.column_count
|
|
view_infos_d["line_count"] = <int> view_infos_p.line_count
|
|
view_infos_d["created_from"] = bytes2str(view_infos_p.created_from)
|
|
view_infos_d["creation_date"] = bytes2str(obi_format_date(view_infos_p.creation_date))
|
|
if (view_infos_p.all_lines) :
|
|
view_infos_d["line_selection"] = None
|
|
else :
|
|
view_infos_d["line_selection"] = {}
|
|
view_infos_d["line_selection"]["column_name"] = bytes2str((view_infos_p.line_selection).column_name)
|
|
view_infos_d["line_selection"]["version"] = <int> (view_infos_p.line_selection).version
|
|
view_infos_d["column_references"] = {}
|
|
column_references = view_infos_p.column_references
|
|
for j in range(view_infos_d["column_count"]) :
|
|
column_name = bytes2str((column_references[j]).alias)
|
|
view_infos_d["column_references"][column_name] = {}
|
|
view_infos_d["column_references"][column_name]["original_name"] = bytes2str((column_references[j]).column_refs.column_name)
|
|
view_infos_d["column_references"][column_name]["version"] = (column_references[j]).column_refs.version
|
|
|
|
obi_view_unmap_file(self._pointer, view_infos_p)
|
|
|
|
return view_infos_d
|
|
|
|
|
|
# cpdef dict read_views(self) : # TODO function that prints the dic nicely and function that prints 1 view nicely. Add column type in col ref
|
|
#
|
|
# cdef Obiviews_infos_all_p all_views_p
|
|
# cdef Obiview_infos_p view_p
|
|
# cdef Column_reference_p column_refs
|
|
# cdef int nb_views
|
|
# cdef int i, j
|
|
# cdef str view_name
|
|
# cdef str column_name
|
|
# cdef dict views
|
|
# cdef bytes name_b
|
|
#
|
|
# views = {}
|
|
# all_views_p = obi_read_view_infos(self._pointer)
|
|
# if all_views_p == NULL :
|
|
# raise Exception("No views to read")
|
|
# nb_views = <int> (all_views_p.header).view_count
|
|
# for i in range(nb_views) :
|
|
# view_p = (<Obiview_infos_p> (all_views_p.view_infos)) + i
|
|
# view_name = bytes2str(view_p.name)
|
|
# views[view_name] = {}
|
|
# views[view_name]["comments"] = bytes2str(view_p.comments)
|
|
# views[view_name]["view_type"] = bytes2str(view_p.view_type)
|
|
# views[view_name]["column_count"] = <int> view_p.column_count
|
|
# views[view_name]["line_count"] = <int> view_p.line_count
|
|
# views[view_name]["view_number"] = <int> view_p.view_number
|
|
# views[view_name]["created_from"] = bytes2str(view_p.created_from)
|
|
# views[view_name]["creation_date"] = bytes2str(obi_format_date(view_p.creation_date))
|
|
# if (view_p.all_lines) :
|
|
# views[view_name]["line_selection"] = None
|
|
# else :
|
|
# views[view_name]["line_selection"] = {}
|
|
# views[view_name]["line_selection"]["column_name"] = bytes2str((view_p.line_selection).column_name)
|
|
# views[view_name]["line_selection"]["version"] = <int> (view_p.line_selection).version
|
|
# views[view_name]["column_references"] = {}
|
|
# column_refs = view_p.column_references
|
|
# for j in range(views[view_name]["column_count"]) :
|
|
# column_name = bytes2str((column_refs[j]).column_name)
|
|
# views[view_name]["column_references"][column_name] = {}
|
|
# views[view_name]["column_references"][column_name]["version"] = column_refs[j].version
|
|
#
|
|
# obi_close_view_infos(all_views_p);
|
|
#
|
|
# return views
|
|
|
|
|
|
|
|
|