Files
obitools3/python/obitools3/obidms/_obidms.pyx

791 lines
30 KiB
Cython
Raw Normal View History

#cython: language_level=3
2015-05-26 14:17:08 +02:00
2015-09-28 13:51:35 +02:00
from obitools3.utils cimport bytes2str, str2bytes
from .capi.obidms cimport obi_dms, \
obi_close_dms
2016-02-18 10:38:51 +01:00
from .capi.obidmscolumn cimport obi_close_column, \
2016-02-18 10:38:51 +01:00
OBIDMS_column_p, \
OBIDMS_column_header_p
from .capi.obiutils cimport obi_format_date
from .capi.obialign cimport obi_align_one_column
from .capi.obitypes cimport const_char_p, \
OBIType_t, \
OBI_INT, \
OBI_FLOAT, \
OBI_BOOL, \
OBI_CHAR, \
OBI_QUAL, \
OBI_STR, \
OBI_SEQ, \
2016-02-18 10:38:51 +01:00
name_data_type, \
only_ATGC # discuss
from ._obidms cimport OBIDMS, \
OBIDMS_column, \
OBIView, \
OBIView_line
from ._obitaxo cimport OBI_Taxonomy
from ._obiseq cimport OBI_Nuc_Seq, OBI_Nuc_Seq_Stored
2015-09-28 13:51:35 +02:00
from ._obidmscolumn_int cimport OBIDMS_column_int, \
2016-02-18 10:38:51 +01:00
OBIDMS_column_multi_elts_int
2015-09-28 13:51:35 +02:00
from ._obidmscolumn_float cimport OBIDMS_column_float, \
2016-02-18 10:38:51 +01:00
OBIDMS_column_multi_elts_float
2015-09-28 13:51:35 +02:00
from ._obidmscolumn_bool cimport OBIDMS_column_bool, \
2016-02-18 10:38:51 +01:00
OBIDMS_column_multi_elts_bool
2015-09-28 13:51:35 +02:00
from ._obidmscolumn_char cimport OBIDMS_column_char, \
2016-02-18 10:38:51 +01:00
OBIDMS_column_multi_elts_char
2015-09-28 13:51:35 +02:00
from ._obidmscolumn_qual cimport OBIDMS_column_qual, \
OBIDMS_column_multi_elts_qual
from ._obidmscolumn_str cimport OBIDMS_column_str, \
2016-02-18 10:38:51 +01:00
OBIDMS_column_multi_elts_str
from ._obidmscolumn_seq cimport OBIDMS_column_seq, \
2016-02-18 10:38:51 +01:00
OBIDMS_column_multi_elts_seq
from .capi.obiview cimport Obiview_p, \
Obiview_infos_p, \
Alias_column_pair_p, \
obi_new_view_nuc_seqs, \
2016-02-18 10:38:51 +01:00
obi_new_view, \
obi_new_view_cloned_from_name, \
obi_new_view_nuc_seqs_cloned_from_name, \
obi_view_map_file, \
obi_view_unmap_file, \
2016-02-18 10:38:51 +01:00
obi_open_view, \
obi_view_delete_column, \
obi_view_add_column, \
obi_view_create_column_alias, \
obi_view_get_column, \
2016-02-18 10:38:51 +01:00
obi_view_get_pointer_on_column_in_view, \
obi_select_line, \
obi_select_lines, \
obi_save_and_close_view, \
VIEW_TYPE_NUC_SEQS, \
NUC_SEQUENCE_COLUMN, \
ID_COLUMN, \
DEFINITION_COLUMN, \
QUALITY_COLUMN
2016-02-18 10:38:51 +01:00
from libc.stdlib cimport malloc
from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer
2016-02-18 10:38:51 +01:00
cdef class OBIDMS_column :
# Should only be initialized through a subclass
def __init__(self, OBIView view, str column_name):
2016-02-18 10:38:51 +01:00
cdef OBIDMS_column_p column_p
cdef OBIDMS_column_p* column_pp
column_pp = obi_view_get_pointer_on_column_in_view(view.pointer, str2bytes(column_name))
2016-02-18 10:38:51 +01:00
column_p = column_pp[0] # TODO ugly cython dereferencing but can't find better
2016-02-18 10:38:51 +01:00
# Fill structure
self.pointer = column_pp
self.dms = view.dms
self.view = view
2016-02-18 10:38:51 +01:00
self.data_type = bytes2str(name_data_type((column_p.header).returned_data_type))
self.column_name = bytes2str((column_p.header).name)
self.nb_elements_per_line = (column_p.header).nb_elements_per_line
self.elements_names = (bytes2str((column_p.header).elements_names)).split(';')
2016-02-18 10:38:51 +01:00
def __setitem__(self, index_t line_nb, object value):
self.set_line(line_nb, value)
2016-02-18 10:38:51 +01:00
def __getitem__(self, index_t line_nb):
return self.get_line(line_nb)
2016-02-18 10:38:51 +01:00
def __len__(self):
return (self.pointer)[0].header.lines_used
def __sizeof__(self):
return ((self.pointer)[0].header.header_size + (self.pointer)[0].header.data_size)
def __iter__(self):
# Declarations
2016-02-18 10:38:51 +01:00
cdef index_t lines_used
cdef index_t line_nb
# Yield each line
lines_used = (self.pointer)[0].header.lines_used
for line_nb in range(lines_used):
yield self.get_line(line_nb)
cpdef list get_elements_names(self):
return self.elements_names
cpdef str get_data_type(self):
return self.data_type
cpdef index_t get_nb_lines_used(self):
return (self.pointer)[0].header.lines_used
cpdef str get_creation_date(self):
return bytes2str(obi_format_date((self.pointer)[0].header.creation_date))
2016-02-18 10:38:51 +01:00
cpdef str get_comments(self):
return bytes2str((self.pointer)[0].header.comments)
def __str__(self) :
2016-02-18 10:38:51 +01:00
cdef str to_print
to_print = ''
for line in self :
to_print = to_print + str(line) + "\n"
return to_print
def __repr__(self) :
return (self.column_name + ", version " + str((self.pointer)[0].header.version) + ", data type: " + self.data_type)
2016-02-18 10:38:51 +01:00
cpdef close(self):
if obi_close_column((self.pointer)[0]) < 0 :
2016-02-18 10:38:51 +01:00
raise Exception("Problem closing a column")
@staticmethod
cdef object get_subclass_type(OBIDMS_column_p column_p) :
cdef object subclass
cdef OBIDMS_column_header_p header
2016-02-18 10:38:51 +01:00
cdef OBIType_t col_type
cdef bint col_writable
cdef bint col_one_element_per_line
header = column_p.header
col_type = header.returned_data_type
col_writable = column_p.writable
col_one_element_per_line = ((header.nb_elements_per_line) == 1)
2016-02-18 10:38:51 +01:00
if col_type == OBI_INT :
if col_one_element_per_line :
subclass = OBIDMS_column_int
else :
subclass = OBIDMS_column_multi_elts_int
elif col_type == OBI_FLOAT :
if col_one_element_per_line :
subclass = OBIDMS_column_float
else :
subclass = OBIDMS_column_multi_elts_float
elif col_type == OBI_BOOL :
if col_one_element_per_line :
subclass = OBIDMS_column_bool
else :
subclass = OBIDMS_column_multi_elts_bool
elif col_type == OBI_CHAR :
if col_one_element_per_line :
subclass = OBIDMS_column_char
else :
subclass = OBIDMS_column_multi_elts_char
elif col_type == OBI_QUAL :
if col_one_element_per_line :
subclass = OBIDMS_column_qual
else :
subclass = OBIDMS_column_multi_elts_qual
2016-02-18 10:38:51 +01:00
elif col_type == OBI_STR :
if col_one_element_per_line :
subclass = OBIDMS_column_str
else :
subclass = OBIDMS_column_multi_elts_str
elif col_type == OBI_SEQ :
if col_one_element_per_line :
subclass = OBIDMS_column_seq
else :
subclass = OBIDMS_column_multi_elts_seq
else :
raise Exception("Problem with the data type")
return subclass
######################################################################################################
cdef class OBIDMS_column_multi_elts(OBIDMS_column) :
def __getitem__(self, index_t line_nb):
return OBIDMS_column_line(self, line_nb)
cpdef set_line(self, index_t line_nb, dict values):
for element_name in values :
self.set_item(line_nb, element_name, values[element_name])
######################################################################################################
cdef class OBIDMS_column_line :
def __init__(self, OBIDMS_column column, index_t line_nb) :
self.index = line_nb
self.column = column
def __getitem__(self, str element_name) :
return self.column.get_item(self.index, element_name)
2016-02-18 10:38:51 +01:00
def __setitem__(self, str element_name, object value):
self.column.set_item(self.index, element_name, value)
def __contains__(self, str element_name):
return (element_name in self.column.elements_names)
def __repr__(self) :
return str(self.column.get_line(self.index))
##########################################
cdef class OBIView :
def __init__(self, OBIDMS dms, str view_name, bint new=False, object view_to_clone=None, list line_selection=None, str comments="", bint quality_column=False):
# TODO quality_column is only here because it's needed for OBIView_NUC_SEQS views, not clean
cdef Obiview_p view = NULL
cdef int i
cdef list col_list
cdef str col_name
cdef OBIDMS_column column
cdef OBIDMS_column_p column_p
cdef OBIDMS_column_header_p header
cdef index_t* line_selection_p
2016-02-18 10:38:51 +01:00
self.dms = dms
# Create the C array for the line selection if needed
2016-02-18 10:38:51 +01:00
if line_selection is not None :
line_selection_p = <index_t*> malloc((len(line_selection) + 1) * sizeof(index_t))
for i in range(len(line_selection)) :
line_selection_p[i] = line_selection[i]
2016-02-18 10:38:51 +01:00
line_selection_p[len(line_selection)] = -1
else :
line_selection_p = NULL
# Create the view if needed
2016-02-18 10:38:51 +01:00
if new :
if view_to_clone is not None :
if type(view_to_clone) == str :
2016-02-29 17:56:55 +01:00
view = obi_new_view_cloned_from_name(dms.pointer, str2bytes(view_name), str2bytes(view_to_clone), line_selection_p, str2bytes(comments))
2016-02-18 10:38:51 +01:00
else :
2016-02-29 17:56:55 +01:00
view = obi_new_view(dms.pointer, str2bytes(view_name), (<OBIView> view_to_clone).pointer, line_selection_p, str2bytes(comments))
2016-02-18 10:38:51 +01:00
elif view_to_clone is None :
2016-02-29 17:56:55 +01:00
view = obi_new_view(dms.pointer, str2bytes(view_name), NULL, line_selection_p, str2bytes(comments))
# Else, open the existing view
2016-02-18 10:38:51 +01:00
elif not new :
if view_name is not None :
view = obi_open_view(dms.pointer, str2bytes(view_name))
elif view_name is None :
view = obi_open_view(dms.pointer, NULL) # TODO discuss
2016-02-18 10:38:51 +01:00
if view == NULL :
raise Exception("Error creating/opening a view")
2016-02-18 10:38:51 +01:00
self.pointer = view
self.name = bytes2str(view.infos.name)
# Go through columns to build list of corresponding python instances
2016-02-18 10:38:51 +01:00
self.columns = {}
for i in range(view.infos.column_count) :
column_p = <OBIDMS_column_p> (view.columns)[i]
header = (column_p).header
col_name = bytes2str(view.infos.column_references[i].alias)
subclass = OBIDMS_column.get_subclass_type(column_p)
2016-02-18 10:38:51 +01:00
self.columns[col_name] = subclass(self, col_name)
2016-02-18 10:38:51 +01:00
def __repr__(self) :
cdef str s
2016-07-19 15:30:17 +02:00
s = str(self.name) + "\n" + str(self.comments) + "\n" + str(self.pointer.infos.line_count) + " lines\n"
for column_name in self.columns :
s = s + column_name + ": " + self.columns[column_name].__repr__() + '\n'
2016-02-18 10:38:51 +01:00
return s
cpdef delete_column(self, str column_name) :
2016-07-19 15:30:17 +02:00
2016-02-18 10:38:51 +01:00
cdef str column_n
2016-07-19 15:30:17 +02:00
if obi_view_delete_column(self.pointer, str2bytes(column_name)) < 0 :
2016-02-18 10:38:51 +01:00
raise Exception("Problem deleting a column from a view")
# Update the dictionary of column objects:
2016-02-18 10:38:51 +01:00
(self.columns).pop(column_name)
self.update_column_pointers()
2016-06-10 10:34:47 +02:00
2016-02-18 10:38:51 +01:00
cpdef add_column(self,
str column_name,
obiversion_t version_number=-1,
str alias='',
2016-02-18 10:38:51 +01:00
str type='',
index_t nb_lines=0,
index_t nb_elements_per_line=1,
2016-02-18 10:38:51 +01:00
list elements_names=None,
str indexer_name="",
2016-07-18 13:57:49 +02:00
str associated_column_name="",
obiversion_t associated_column_version=-1,
2016-02-18 10:38:51 +01:00
str comments="",
bint create=True
2016-02-18 10:38:51 +01:00
) :
2016-07-19 15:30:17 +02:00
cdef bytes column_name_b
cdef bytes elements_names_b
cdef object subclass
2016-02-18 10:38:51 +01:00
cdef OBIDMS_column_p column_p
column_name_b = str2bytes(column_name)
if alias == '' :
alias = column_name
alias_b = column_name_b
else :
alias_b = str2bytes(alias)
2016-02-18 10:38:51 +01:00
if nb_elements_per_line > 1 :
elements_names_b = str2bytes(';'.join(elements_names))
elif nb_elements_per_line == 1 :
elements_names_b = column_name_b
if type :
if type == 'OBI_INT' :
data_type = OBI_INT
elif type == 'OBI_FLOAT' :
data_type = OBI_FLOAT
elif type == 'OBI_BOOL' :
data_type = OBI_BOOL
elif type == 'OBI_CHAR' :
data_type = OBI_CHAR
elif type == 'OBI_QUAL' :
data_type = OBI_QUAL
elif type == 'OBI_STR' :
data_type = OBI_STR
elif type == 'OBI_SEQ' :
data_type = OBI_SEQ
else :
raise Exception("Invalid provided data type")
if (obi_view_add_column(self.pointer, column_name_b, version_number, alias_b, # TODO should return pointer on column?
2016-02-18 10:38:51 +01:00
data_type, nb_lines, nb_elements_per_line,
2016-04-12 14:53:33 +02:00
elements_names_b, str2bytes(indexer_name),
2016-07-18 13:57:49 +02:00
str2bytes(associated_column_name), associated_column_version,
2016-02-18 10:38:51 +01:00
str2bytes(comments), create) < 0) :
raise Exception("Problem adding a column in a view")
# Get the column pointer
column_p = obi_view_get_column(self.pointer, alias_b)
2016-02-18 10:38:51 +01:00
# Open and store the subclass
subclass = OBIDMS_column.get_subclass_type(column_p)
(self.columns)[alias] = subclass(self, alias)
cpdef change_column_alias(self, str current_alias, str new_alias):
if (obi_view_create_column_alias(self.pointer, str2bytes(current_alias), str2bytes(new_alias)) < 0) :
raise Exception("Problem changing a column alias")
# Update the dictionaries of column column objects
self.columns[new_alias] = self.columns[current_alias]
(self.columns).pop(current_alias)
cpdef update_column_pointers(self):
cdef str column_n
cdef OBIDMS_column column
for column_n in self.columns :
column = self.columns[column_n]
column.pointer = <OBIDMS_column_p*> obi_view_get_pointer_on_column_in_view(self.pointer, str2bytes(column_n))
2016-02-18 10:38:51 +01:00
cpdef save_and_close(self) :
if (obi_save_and_close_view(self.pointer) < 0) :
raise Exception("Problem closing a view")
2016-02-18 10:38:51 +01:00
def __iter__(self):
# iter on each line of all columns
2015-09-28 13:51:35 +02:00
# Declarations
cdef index_t lines_used
cdef index_t line_nb
cdef OBIView_line line # TODO Check that this works for NUC SEQ views
2015-09-28 13:51:35 +02:00
# Yield each line
lines_used = self.pointer.infos.line_count
2016-02-18 10:38:51 +01:00
for line_nb in range(lines_used) :
line = self[line_nb]
yield line
2016-02-18 10:38:51 +01:00
def __getitem__(self, object item) :
if type(item) == str :
return (self.columns)[item]
elif type(item) == int :
2016-02-18 10:38:51 +01:00
return OBIView_line(self, item)
cpdef select_line(self, index_t line_nb) :
if obi_select_line(self.pointer, line_nb) < 0 :
raise Exception("Problem selecting a line")
2016-02-18 10:38:51 +01:00
cpdef select_lines(self, list line_selection) :
cdef index_t* line_selection_p
line_selection_p = <index_t*> malloc((len(line_selection) + 1) * sizeof(index_t))
for i in range(len(line_selection)) :
line_selection_p[i] = line_selection[i]
2016-02-18 10:38:51 +01:00
line_selection_p[len(line_selection)] = -1
if obi_select_lines(self.pointer, line_selection_p) < 0 :
raise Exception("Problem selecting a list of lines")
2016-02-18 10:38:51 +01:00
def __contains__(self, str column_name):
return (column_name in self.columns)
2015-09-28 13:51:35 +02:00
def __len__(self):
return(self.pointer.infos.line_count)
2016-02-18 10:38:51 +01:00
def __str__(self) :
cdef OBIView_line line
cdef str to_print
to_print = ""
for line in self.__iter__() :
to_print = to_print + str(line) + "\n"
return to_print
2015-09-28 13:51:35 +02:00
2016-02-18 10:38:51 +01:00
#############################################
cdef class OBIView_NUC_SEQS(OBIView):
def __init__(self, OBIDMS dms, str view_name, bint new=False, object view_to_clone=None, list line_selection=None, str comments="", bint quality_column=False):
cdef Obiview_p view = NULL
cdef int i
cdef list col_list
cdef str col_name
cdef OBIDMS_column column
cdef OBIDMS_column_p column_p
cdef OBIDMS_column_header_p header
cdef index_t* line_selection_p
self.dms = dms
if line_selection is not None :
line_selection_p = <index_t*> malloc((len(line_selection) + 1) * sizeof(index_t))
for i in range(len(line_selection)) :
line_selection_p[i] = line_selection[i]
line_selection_p[len(line_selection)] = -1
else :
line_selection_p = NULL
if new :
if view_to_clone is not None :
if type(view_to_clone) == str :
view = obi_new_view_nuc_seqs_cloned_from_name(dms.pointer, str2bytes(view_name), str2bytes(view_to_clone), line_selection_p, str2bytes(comments), quality_column)
else :
view = obi_new_view_nuc_seqs(dms.pointer, str2bytes(view_name), (<OBIView> view_to_clone).pointer, line_selection_p, str2bytes(comments), quality_column)
elif view_to_clone is None :
view = obi_new_view_nuc_seqs(dms.pointer, str2bytes(view_name), NULL, line_selection_p, str2bytes(comments), quality_column)
elif not new :
if view_name is not None :
view = obi_open_view(dms.pointer, str2bytes(view_name))
elif view_name is None :
view = obi_open_view(dms.pointer, NULL)
if view == NULL :
raise Exception("Error creating/opening view")
self.pointer = view
self.name = bytes2str(view.infos.name)
self.comments = bytes2str(view.infos.comments)
# Go through columns to build list of corresponding python instances
self.columns = {}
for i in range(view.infos.column_count) :
column_p = <OBIDMS_column_p> (view.columns)[i]
header = (column_p).header
col_name = bytes2str(view.infos.column_references[i].alias)
subclass = OBIDMS_column.get_subclass_type(column_p)
self.columns[col_name] = subclass(self, col_name)
def __getitem__(self, object item) :
if type(item) == str :
return (self.columns)[item]
elif type(item) == int :
return OBI_Nuc_Seq_Stored(self, item)
def __setitem__(self, index_t line_idx, OBI_Nuc_Seq sequence_obj) :
for key in sequence_obj :
self[line_idx][key] = sequence_obj[key]
# TODO
2016-08-12 15:56:07 +02:00
cpdef align(self, OBIView oview, OBIView iview2=None,
double threshold=0.0, bint normalize=True, int reference=0, bint similarity_mode=True) :
cdef OBIView iview1
cdef Obiview_p iview1_p
cdef Obiview_p iview2_p
cdef Obiview_p oview_p
cdef OBIDMS_column icol1
cdef OBIDMS_column_p icol1_p
cdef OBIDMS_column_p* icol1_pp
cdef OBIDMS_column id1_col
cdef OBIDMS_column_p id1_col_p
cdef OBIDMS_column_p* id1_col_pp
cdef OBIDMS_column id2_col
cdef OBIDMS_column_p id2_col_p
cdef OBIDMS_column_p* id2_col_pp
cdef OBIDMS_column ocol
cdef OBIDMS_column_p ocol_p
cdef OBIDMS_column_p* ocol_pp
cdef str id1_col_name
cdef str id2_col_name
cdef str score_col_name
id1_col_name = "ID1" # TODO discuss names, aliases
id2_col_name = "ID2"
score_col_name = "score"
iview1= self
iview1_p = iview1.pointer
icol1 = iview1[bytes2str(NUC_SEQUENCE_COLUMN)]
icol1_pp = icol1.pointer
icol1_p = icol1_pp[0]
oview.add_column(id1_col_name, type='OBI_STR', create=True)
oview.add_column(id2_col_name, type='OBI_STR', create=True)
oview.add_column(score_col_name, type='OBI_FLOAT', create=True)
oview_p = oview.pointer
ocol = oview[score_col_name]
ocol_pp = ocol.pointer
ocol_p = ocol_pp[0]
id1_col = oview[id1_col_name]
id2_col = oview[id2_col_name]
id1_col_pp = id1_col.pointer
id2_col_pp = id2_col.pointer
id1_col_p = id1_col_pp[0]
id2_col_p = id2_col_pp[0]
if obi_align_one_column(iview1_p, icol1_p, oview_p, id1_col_p, id2_col_p, ocol_p, threshold, normalize, reference, similarity_mode) < 0 :
raise Exception("Error aligning sequences")
#############################################
2016-02-18 10:38:51 +01:00
cdef class OBIView_line :
2015-09-28 13:51:35 +02:00
2016-02-18 10:38:51 +01:00
def __init__(self, OBIView view, index_t line_nb) :
self.index = line_nb
self.view = view
def __getitem__(self, str column_name) :
return ((self.view).columns)[column_name][self.index]
def __setitem__(self, str column_name, object value):
# TODO detect multiple elements (dict type)? put somewhere else? but more risky (in get)
# TODO OBI_QUAL ?
2016-02-18 10:38:51 +01:00
cdef type value_type
cdef str value_obitype
if column_name not in self.view :
if value == None :
raise Exception("Trying to create a column from a None value (can't guess type)")
value_type = type(value)
if value_type == int :
value_obitype = 'OBI_INT'
elif value_type == float :
value_obitype = 'OBI_FLOAT'
elif value_type == bool :
value_obitype = 'OBI_BOOL'
elif value_type == str :
2016-04-29 16:07:03 +02:00
if only_ATGC(str2bytes(value)) : # TODO detect IUPAC?
2016-02-18 10:38:51 +01:00
value_obitype = 'OBI_SEQ'
elif len(value) == 1 :
value_obitype = 'OBI_CHAR'
elif (len(value) > 1) :
value_obitype = 'OBI_STR'
else :
raise Exception("Could not guess the type of a value to create a new column")
self.view.add_column(column_name, type=value_obitype)
2016-02-18 10:38:51 +01:00
(((self.view).columns)[column_name]).set_line(self.index, value)
2015-09-28 13:51:35 +02:00
def __iter__(self):
for column_name in ((self.view).columns) :
yield column_name
2016-02-18 10:38:51 +01:00
def __contains__(self, str column_name):
return (column_name in self.view.columns)
2016-02-18 10:38:51 +01:00
def __repr__(self):
cdef dict line
cdef str column_name
line = {}
for column_name in self.view.columns :
line[column_name] = self[column_name]
return str(line)
2016-02-18 10:38:51 +01:00
##########################################
2016-02-18 10:38:51 +01:00
cdef class OBIDMS :
def __init__(self, str dms_name) :
# Declarations
cdef bytes dms_name_b
# Format the character string to send to C function
dms_name_b = str2bytes(dms_name)
# Fill structure and create or open the DMS
self.dms_name = dms_name
self.pointer = obi_dms(<const_char_p> dms_name_b)
if self.pointer == NULL :
raise Exception("Failed opening or creating an OBIDMS")
2016-02-18 10:38:51 +01:00
cpdef close(self) :
if (obi_close_dms(self.pointer)) < 0 :
raise Exception("Problem closing an OBIDMS")
cpdef OBI_Taxonomy open_taxonomy(self, str taxo_name) :
return OBI_Taxonomy(self, taxo_name)
2016-02-18 10:38:51 +01:00
cpdef OBIView open_view(self, str view_name) :
cdef object view_class
cdef dict view_infos
view_infos = self.read_view_infos(view_name)
if view_infos["view_type"] == bytes2str(VIEW_TYPE_NUC_SEQS) :
view_class = OBIView_NUC_SEQS
else :
view_class = OBIView
return view_class(self, view_name)
cpdef OBIView new_view(self, str view_name, object view_to_clone=None, list line_selection=None, str view_type=None, str comments="", bint quality_column=False) :
cdef object view_class
# Get right subclass depending on view type
if view_type is not None :
if view_type == bytes2str(VIEW_TYPE_NUC_SEQS) :
view_class = OBIView_NUC_SEQS
else :
view_class = OBIView
# Check the type of the view to clone if there is one # TODO make generic for future other view types
if view_to_clone is not None and \
((type(view_to_clone) == str and self.read_view_infos(view_to_clone)["view_type"] == VIEW_TYPE_NUC_SEQS) or \
isinstance(view_to_clone, OBIView_NUC_SEQS)) :
view_class = OBIView_NUC_SEQS
return view_class(self, view_name, new=True, view_to_clone=view_to_clone, line_selection=line_selection, comments=comments, quality_column=quality_column)
cpdef dict read_view_infos(self, str view_name) :
cdef Obiview_infos_p view_infos_p
cdef dict view_infos_d
cdef Alias_column_pair_p column_refs
cdef int i, j
cdef str column_name
view_infos_p = obi_view_map_file(self.pointer, str2bytes(view_name))
view_infos_d = {}
view_infos_d["name"] = bytes2str(view_infos_p.name)
view_infos_d["comments"] = bytes2str(view_infos_p.comments)
view_infos_d["view_type"] = bytes2str(view_infos_p.view_type)
view_infos_d["column_count"] = <int> view_infos_p.column_count
view_infos_d["line_count"] = <int> view_infos_p.line_count
view_infos_d["created_from"] = bytes2str(view_infos_p.created_from)
view_infos_d["creation_date"] = bytes2str(obi_format_date(view_infos_p.creation_date))
if (view_infos_p.all_lines) :
view_infos_d["line_selection"] = None
else :
view_infos_d["line_selection"] = {}
view_infos_d["line_selection"]["column_name"] = bytes2str((view_infos_p.line_selection).column_name)
view_infos_d["line_selection"]["version"] = <int> (view_infos_p.line_selection).version
view_infos_d["column_references"] = {}
column_references = view_infos_p.column_references
for j in range(view_infos_d["column_count"]) :
column_name = bytes2str((column_references[j]).alias)
view_infos_d["column_references"][column_name] = {}
view_infos_d["column_references"][column_name]["original_name"] = bytes2str((column_references[j]).column_refs.column_name)
view_infos_d["column_references"][column_name]["version"] = (column_references[j]).column_refs.version
obi_view_unmap_file(self.pointer, view_infos_p)
return view_infos_d
# cpdef dict read_views(self) : # TODO function that prints the dic nicely and function that prints 1 view nicely. Add column type in col ref
#
# cdef Obiviews_infos_all_p all_views_p
# cdef Obiview_infos_p view_p
# cdef Column_reference_p column_refs
# cdef int nb_views
# cdef int i, j
# cdef str view_name
# cdef str column_name
# cdef dict views
# cdef bytes name_b
#
# views = {}
# all_views_p = obi_read_view_infos(self.pointer)
# if all_views_p == NULL :
# raise Exception("No views to read")
# nb_views = <int> (all_views_p.header).view_count
# for i in range(nb_views) :
# view_p = (<Obiview_infos_p> (all_views_p.view_infos)) + i
# view_name = bytes2str(view_p.name)
# views[view_name] = {}
# views[view_name]["comments"] = bytes2str(view_p.comments)
# views[view_name]["view_type"] = bytes2str(view_p.view_type)
# views[view_name]["column_count"] = <int> view_p.column_count
# views[view_name]["line_count"] = <int> view_p.line_count
# views[view_name]["view_number"] = <int> view_p.view_number
# views[view_name]["created_from"] = bytes2str(view_p.created_from)
# views[view_name]["creation_date"] = bytes2str(obi_format_date(view_p.creation_date))
# if (view_p.all_lines) :
# views[view_name]["line_selection"] = None
# else :
# views[view_name]["line_selection"] = {}
# views[view_name]["line_selection"]["column_name"] = bytes2str((view_p.line_selection).column_name)
# views[view_name]["line_selection"]["version"] = <int> (view_p.line_selection).version
# views[view_name]["column_references"] = {}
# column_refs = view_p.column_references
# for j in range(views[view_name]["column_count"]) :
# column_name = bytes2str((column_refs[j]).column_name)
# views[view_name]["column_references"][column_name] = {}
# views[view_name]["column_references"][column_name]["version"] = column_refs[j].version
#
# obi_close_view_infos(all_views_p);
#
# return views