Files
obitools3/python/obitools3/obidms/_obidms.pyx
2016-06-10 10:34:47 +02:00

675 lines
25 KiB
Cython

#cython: language_level=3
from obitools3.utils cimport bytes2str, str2bytes
from .capi.obidms cimport obi_dms, \
obi_close_dms
from .capi.obidmscolumn cimport obi_close_column, \
OBIDMS_column_p, \
OBIDMS_column_header_p
from .capi.obiutils cimport obi_format_date
from .capi.obitypes cimport const_char_p, \
OBIType_t, \
OBI_INT, \
OBI_FLOAT, \
OBI_BOOL, \
OBI_CHAR, \
OBI_QUAL, \
OBI_STR, \
OBI_SEQ, \
name_data_type, \
only_ATGC # discuss
from ._obidms cimport OBIDMS, \
OBIDMS_column, \
OBIView, \
OBIView_line
from ._obitaxo cimport OBI_Taxonomy
from ._obiseq cimport OBI_Nuc_Seq, OBI_Nuc_Seq_Stored
from ._obidmscolumn_int cimport OBIDMS_column_int, \
OBIDMS_column_multi_elts_int
from ._obidmscolumn_float cimport OBIDMS_column_float, \
OBIDMS_column_multi_elts_float
from ._obidmscolumn_bool cimport OBIDMS_column_bool, \
OBIDMS_column_multi_elts_bool
from ._obidmscolumn_char cimport OBIDMS_column_char, \
OBIDMS_column_multi_elts_char
from ._obidmscolumn_qual cimport OBIDMS_column_qual, \
OBIDMS_column_multi_elts_qual
from ._obidmscolumn_str cimport OBIDMS_column_str, \
OBIDMS_column_multi_elts_str
from ._obidmscolumn_seq cimport OBIDMS_column_seq, \
OBIDMS_column_multi_elts_seq
from .capi.obiview cimport Obiview_p, \
Obiviews_infos_all_p, \
Obiview_infos_p, \
Column_reference_p, \
obi_new_view_nuc_seqs, \
obi_new_view, \
obi_new_view_cloned_from_name, \
obi_new_view_nuc_seqs_cloned_from_name, \
obi_open_view, \
obi_read_view_infos, \
obi_close_view_infos, \
obi_view_delete_column, \
obi_view_add_column, \
obi_view_get_column, \
obi_view_get_column, \
obi_view_get_pointer_on_column_in_view, \
obi_select_line, \
obi_select_lines, \
obi_save_and_close_view, \
VIEW_TYPE_NUC_SEQS, \
NUC_SEQUENCE_COLUMN, \
ID_COLUMN, \
DEFINITION_COLUMN, \
QUALITY_COLUMN
from libc.stdlib cimport malloc
from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer
cdef class OBIDMS_column :
# Should only be initialized through a subclass
def __init__(self, OBIView view, str column_name):
cdef OBIDMS_column_p column_p
cdef OBIDMS_column_p* column_pp
column_pp = obi_view_get_pointer_on_column_in_view(view.pointer, str2bytes(column_name))
column_p = column_pp[0] # TODO ugly cython dereferencing but can't find better
# Fill structure
self.pointer = column_pp
self.dms = view.dms
self.view = view
self.data_type = bytes2str(name_data_type((column_p.header).returned_data_type))
self.column_name = bytes2str((column_p.header).name)
self.nb_elements_per_line = (column_p.header).nb_elements_per_line
self.elements_names = (bytes2str((column_p.header).elements_names)).split(';')
def __setitem__(self, index_t line_nb, object value):
self.set_line(line_nb, value)
def __getitem__(self, index_t line_nb):
return self.get_line(line_nb)
def __len__(self):
return (self.pointer)[0].header.lines_used
def __sizeof__(self):
return ((self.pointer)[0].header.header_size + (self.pointer)[0].header.data_size)
def __iter__(self):
# Declarations
cdef index_t lines_used
cdef index_t line_nb
# Yield each line
lines_used = (self.pointer)[0].header.lines_used
for line_nb in range(lines_used):
yield self.get_line(line_nb)
cpdef update_pointer(self):
self.pointer = <OBIDMS_column_p*> obi_view_get_pointer_on_column_in_view(self.view.pointer, str2bytes(self.column_name))
cpdef list get_elements_names(self):
return self.elements_names
cpdef str get_data_type(self):
return self.data_type
cpdef index_t get_nb_lines_used(self):
return (self.pointer)[0].header.lines_used
cpdef str get_creation_date(self):
return bytes2str(obi_format_date((self.pointer)[0].header.creation_date))
cpdef str get_comments(self):
return bytes2str((self.pointer)[0].header.comments)
def __str__(self) :
cdef str to_print
to_print = ''
for line in self :
to_print = to_print + str(line) + "\n"
return to_print
def __repr__(self) :
return (self.column_name + ", version " + str((self.pointer)[0].header.version) + ", data type: " + self.data_type)
cpdef close(self):
if obi_close_column((self.pointer)[0]) < 0 :
raise Exception("Problem closing a column")
@staticmethod
cdef object get_subclass_type(OBIDMS_column_p column_p) :
cdef object subclass
cdef OBIDMS_column_header_p header
cdef OBIType_t col_type
cdef bint col_writable
cdef bint col_one_element_per_line
header = column_p.header
col_type = header.returned_data_type
col_writable = column_p.writable
col_one_element_per_line = ((header.nb_elements_per_line) == 1)
if col_type == OBI_INT :
if col_one_element_per_line :
subclass = OBIDMS_column_int
else :
subclass = OBIDMS_column_multi_elts_int
elif col_type == OBI_FLOAT :
if col_one_element_per_line :
subclass = OBIDMS_column_float
else :
subclass = OBIDMS_column_multi_elts_float
elif col_type == OBI_BOOL :
if col_one_element_per_line :
subclass = OBIDMS_column_bool
else :
subclass = OBIDMS_column_multi_elts_bool
elif col_type == OBI_CHAR :
if col_one_element_per_line :
subclass = OBIDMS_column_char
else :
subclass = OBIDMS_column_multi_elts_char
elif col_type == OBI_QUAL :
if col_one_element_per_line :
subclass = OBIDMS_column_qual
else :
subclass = OBIDMS_column_multi_elts_qual
elif col_type == OBI_STR :
if col_one_element_per_line :
subclass = OBIDMS_column_str
else :
subclass = OBIDMS_column_multi_elts_str
elif col_type == OBI_SEQ :
if col_one_element_per_line :
subclass = OBIDMS_column_seq
else :
subclass = OBIDMS_column_multi_elts_seq
else :
raise Exception("Problem with the data type")
return subclass
######################################################################################################
cdef class OBIDMS_column_multi_elts(OBIDMS_column) :
def __getitem__(self, index_t line_nb):
return OBIDMS_column_line(self, line_nb)
cpdef set_line(self, index_t line_nb, dict values):
for element_name in values :
self.set_item(line_nb, element_name, values[element_name])
######################################################################################################
cdef class OBIDMS_column_line :
def __init__(self, OBIDMS_column column, index_t line_nb) :
self.index = line_nb
self.column = column
def __getitem__(self, str element_name) :
return self.column.get_item(self.index, element_name)
def __setitem__(self, str element_name, object value):
self.column.set_item(self.index, element_name, value)
def __contains__(self, str element_name):
return (element_name in self.column.elements_names)
def __repr__(self) :
return str(self.column.get_line(self.index))
##########################################
cdef class OBIView :
def __init__(self, OBIDMS dms, str view_name, bint new=False, object view_to_clone=None, list line_selection=None, str comments=""):
cdef Obiview_p view = NULL
cdef int i
cdef list col_list
cdef str col_name
cdef OBIDMS_column column
cdef OBIDMS_column_p column_p
cdef OBIDMS_column_header_p header
cdef index_t* line_selection_p
self.dms = dms
# Create the C array for the line selection if needed
if line_selection is not None :
line_selection_p = <index_t*> malloc((len(line_selection) + 1) * sizeof(index_t))
for i in range(len(line_selection)) :
line_selection_p[i] = line_selection[i]
line_selection_p[len(line_selection)] = -1
else :
line_selection_p = NULL
# Create the view if needed
if new :
if view_to_clone is not None :
if type(view_to_clone) == str :
view = obi_new_view_cloned_from_name(dms.pointer, str2bytes(view_name), str2bytes(view_to_clone), line_selection_p, str2bytes(comments))
else :
view = obi_new_view(dms.pointer, str2bytes(view_name), (<OBIView> view_to_clone).pointer, line_selection_p, str2bytes(comments))
elif view_to_clone is None :
view = obi_new_view(dms.pointer, str2bytes(view_name), NULL, line_selection_p, str2bytes(comments))
# Else, open the existing view
elif not new :
if view_name is not None :
view = obi_open_view(dms.pointer, str2bytes(view_name))
elif view_name is None :
view = obi_open_view(dms.pointer, NULL) # TODO discuss
if view == NULL :
raise Exception("Error creating/opening a view")
self.pointer = view
self.name = bytes2str(view.name)
# Go through columns to build list of corresponding python instances
self.columns = {}
for i in range(view.column_count) :
column_p = <OBIDMS_column_p> (view.columns)[i]
header = (column_p).header
col_name = bytes2str(header.name)
subclass = OBIDMS_column.get_subclass_type(column_p)
self.columns[col_name] = subclass(self, col_name)
def __repr__(self) :
cdef str s
s = str(self.name) + ", " + str(self.comments) + ", " + str(self.pointer.line_count) + " lines\n"
for column_name in self.columns :
s = s + self.columns[column_name].__repr__() + '\n'
return s
cpdef delete_column(self, str column_name) :
cdef int i
cdef Obiview_p view_p
cdef OBIDMS_column column
cdef OBIDMS_column_p column_p
cdef OBIDMS_column_header_p header
cdef str column_n
view = self.pointer
if obi_view_delete_column(view_p, str2bytes(column_name)) < 0 :
raise Exception("Problem deleting a column from a view")
# Update the dictionaries of column pointers and column objects, and update pointers in column objects (make function?):
(self.columns).pop(column_name)
for column_n in self.columns :
(self.columns[column_n]).update_pointer()
cpdef add_column(self,
str column_name,
obiversion_t version_number=-1,
str type='',
index_t nb_lines=0,
index_t nb_elements_per_line=1,
list elements_names=None,
str indexer_name="",
str comments="",
bint create=True
) :
cdef bytes column_name_b
cdef bytes elements_names_b
cdef object subclass
cdef OBIDMS_column_p column_p
column_name_b = str2bytes(column_name)
if nb_elements_per_line > 1 :
elements_names_b = str2bytes(';'.join(elements_names))
elif nb_elements_per_line == 1 :
elements_names_b = column_name_b
if type :
if type == 'OBI_INT' :
data_type = OBI_INT
elif type == 'OBI_FLOAT' :
data_type = OBI_FLOAT
elif type == 'OBI_BOOL' :
data_type = OBI_BOOL
elif type == 'OBI_CHAR' :
data_type = OBI_CHAR
elif type == 'OBI_QUAL' :
data_type = OBI_QUAL
elif type == 'OBI_STR' :
data_type = OBI_STR
elif type == 'OBI_SEQ' :
data_type = OBI_SEQ
else :
raise Exception("Invalid provided data type")
if (obi_view_add_column(self.pointer, column_name_b, version_number, # TODO should return pointer on column?
data_type, nb_lines, nb_elements_per_line,
elements_names_b, str2bytes(indexer_name),
str2bytes(comments), create) < 0) :
raise Exception("Problem adding a column in a view")
# Get the column pointer
column_p = obi_view_get_column(self.pointer, column_name_b)
# Open and store the subclass
subclass = OBIDMS_column.get_subclass_type(column_p)
(self.columns)[column_name] = subclass(self, column_name)
cpdef save_and_close(self) :
if (obi_save_and_close_view(self.pointer) < 0) :
raise Exception("Problem closing a view")
def __iter__(self):
# iter on each line of all columns
# Declarations
cdef index_t lines_used
cdef index_t line_nb
cdef OBIView_line line # TODO Check that this works for NUC SEQ views
# Yield each line
lines_used = (self.pointer).line_count
for line_nb in range(lines_used) :
line = self[line_nb]
yield line
def __getitem__(self, object item) :
if type(item) == str :
return (self.columns)[item]
elif type(item) == int :
return OBIView_line(self, item)
cpdef select_line(self, index_t line_nb) :
if obi_select_line(self.pointer, line_nb) < 0 :
raise Exception("Problem selecting a line")
cpdef select_lines(self, list line_selection) :
cdef index_t* line_selection_p
line_selection_p = <index_t*> malloc((len(line_selection) + 1) * sizeof(index_t))
for i in range(len(line_selection)) :
line_selection_p[i] = line_selection[i]
line_selection_p[len(line_selection)] = -1
if obi_select_lines(self.pointer, line_selection_p) < 0 :
raise Exception("Problem selecting a list of lines")
def __contains__(self, str column_name):
return (column_name in self.columns)
def __str__(self) :
cdef OBIView_line line
cdef str to_print
to_print = ""
for line in self.__iter__() :
to_print = to_print + str(line) + "\n"
return to_print
#############################################
cdef class OBIView_NUC_SEQS(OBIView):
def __init__(self, OBIDMS dms, str view_name, bint new=False, object view_to_clone=None, list line_selection=None, str comments=""):
cdef Obiview_p view = NULL
cdef int i
cdef list col_list
cdef str col_name
cdef OBIDMS_column column
cdef OBIDMS_column_p column_p
cdef OBIDMS_column_header_p header
cdef index_t* line_selection_p
self.dms = dms
if line_selection is not None :
line_selection_p = <index_t*> malloc((len(line_selection) + 1) * sizeof(index_t))
for i in range(len(line_selection)) :
line_selection_p[i] = line_selection[i]
line_selection_p[len(line_selection)] = -1
else :
line_selection_p = NULL
if new :
if view_to_clone is not None :
if type(view_to_clone) == str :
view = obi_new_view_nuc_seqs_cloned_from_name(dms.pointer, str2bytes(view_name), str2bytes(view_to_clone), line_selection_p, str2bytes(comments))
else :
view = obi_new_view_nuc_seqs(dms.pointer, str2bytes(view_name), (<OBIView> view_to_clone).pointer, line_selection_p, str2bytes(comments))
elif view_to_clone is None :
view = obi_new_view_nuc_seqs(dms.pointer, str2bytes(view_name), NULL, line_selection_p, str2bytes(comments))
elif not new :
if view_name is not None :
view = obi_open_view(dms.pointer, str2bytes(view_name))
elif view_name is None :
view = obi_open_view(dms.pointer, NULL)
if view == NULL :
raise Exception("Error creating/opening view")
self.pointer = view
self.name = bytes2str(view.name)
self.comments = bytes2str(view.comments)
# Go through columns to build list of corresponding python instances
self.columns = {}
for i in range(view.column_count) :
column_p = <OBIDMS_column_p> (view.columns)[i]
header = (column_p).header
col_name = bytes2str(header.name)
subclass = OBIDMS_column.get_subclass_type(column_p)
self.columns[col_name] = subclass(self, col_name)
self.ids = self.columns[bytes2str(ID_COLUMN)]
self.sequences = self.columns[bytes2str(NUC_SEQUENCE_COLUMN)]
self.definitions = self.columns[bytes2str(DEFINITION_COLUMN)]
self.qualities = self.columns[bytes2str(QUALITY_COLUMN)]
def __getitem__(self, object item) :
if type(item) == str :
return (self.columns)[item]
elif type(item) == int :
return OBI_Nuc_Seq_Stored(self, item)
def __setitem__(self, index_t line_idx, OBI_Nuc_Seq sequence_obj) :
for key in sequence_obj :
self[line_idx][key] = sequence_obj[key]
#############################################
cdef class OBIView_line :
def __init__(self, OBIView view, index_t line_nb) :
self.index = line_nb
self.view = view
def __getitem__(self, str column_name) :
return ((self.view).columns)[column_name][self.index]
def __setitem__(self, str column_name, object value):
# TODO detect multiple elements (dict type)? put somewhere else? but more risky (in get)
# TODO OBI_QUAL ?
cdef type value_type
cdef str value_obitype
if column_name not in self.view :
if value == None :
raise Exception("Trying to create a column from a None value (can't guess type)")
value_type = type(value)
if value_type == int :
value_obitype = 'OBI_INT'
elif value_type == float :
value_obitype = 'OBI_FLOAT'
elif value_type == bool :
value_obitype = 'OBI_BOOL'
elif value_type == str :
if only_ATGC(str2bytes(value)) : # TODO detect IUPAC?
value_obitype = 'OBI_SEQ'
elif len(value) == 1 :
value_obitype = 'OBI_CHAR'
elif (len(value) > 1) :
value_obitype = 'OBI_STR'
else :
raise Exception("Could not guess the type of a value to create a new column")
self.view.add_column(column_name, type=value_obitype)
(((self.view).columns)[column_name]).set_line(self.index, value)
def __contains__(self, str column_name):
return (column_name in self.view)
def __repr__(self):
cdef dict line
cdef str column_name
line = {}
for column_name in self.view.columns :
line[column_name] = self[column_name]
return str(line)
##########################################
cdef class OBIDMS :
def __init__(self, str dms_name) :
# Declarations
cdef bytes dms_name_b
# Format the character string to send to C function
dms_name_b = str2bytes(dms_name)
# Fill structure and create or open the DMS
self.dms_name = dms_name
self.pointer = obi_dms(<const_char_p> dms_name_b)
if self.pointer == NULL :
raise Exception("Failed opening or creating an OBIDMS")
cpdef close(self) :
if (obi_close_dms(self.pointer)) < 0 :
raise Exception("Problem closing an OBIDMS")
cpdef OBI_Taxonomy open_taxonomy(self, str taxo_name) :
return OBI_Taxonomy(self, taxo_name)
cpdef OBIView open_view(self, str view_name) :
cdef object view_class
cdef dict view_infos
view_infos = self.read_view_infos(view_name)
if view_infos["view_type"] == bytes2str(VIEW_TYPE_NUC_SEQS) :
view_class = OBIView_NUC_SEQS
else :
view_class = OBIView
return view_class(self, view_name)
cpdef OBIView new_view(self, str view_name, object view_to_clone=None, list line_selection=None, str view_type=None, str comments="") :
cdef object view_class
if view_type is not None :
if view_type == bytes2str(VIEW_TYPE_NUC_SEQS) :
view_class = OBIView_NUC_SEQS
else :
view_class = OBIView
return view_class(self, view_name, new=True, view_to_clone=view_to_clone, line_selection=line_selection, comments=comments)
cpdef dict read_view_infos(self, str view_name) :
all_views = self.read_views()
return all_views[view_name]
cpdef dict read_views(self) : # TODO function that prints the dic nicely and function that prints 1 view nicely. Add column type in col ref
cdef Obiviews_infos_all_p all_views_p
cdef Obiview_infos_p view_p
cdef Column_reference_p column_refs
cdef int nb_views
cdef int i, j
cdef str view_name
cdef str column_name
cdef dict views
cdef bytes name_b
views = {}
all_views_p = obi_read_view_infos(self.pointer)
if all_views_p == NULL :
raise Exception("No views to read")
nb_views = <int> (all_views_p.header).view_count
for i in range(nb_views) :
view_p = (<Obiview_infos_p> (all_views_p.view_infos)) + i
view_name = bytes2str(view_p.name)
views[view_name] = {}
views[view_name]["comments"] = bytes2str(view_p.comments)
views[view_name]["view_type"] = bytes2str(view_p.view_type)
views[view_name]["column_count"] = <int> view_p.column_count
views[view_name]["line_count"] = <int> view_p.line_count
views[view_name]["view_number"] = <int> view_p.view_number
views[view_name]["created_from"] = bytes2str(view_p.created_from)
views[view_name]["creation_date"] = bytes2str(obi_format_date(view_p.creation_date))
if (view_p.all_lines) :
views[view_name]["line_selection"] = None
else :
views[view_name]["line_selection"] = {}
views[view_name]["line_selection"]["column_name"] = bytes2str((view_p.line_selection).column_name)
views[view_name]["line_selection"]["version"] = <int> (view_p.line_selection).version
views[view_name]["column_references"] = {}
column_refs = view_p.column_references
for j in range(views[view_name]["column_count"]) :
column_name = bytes2str((column_refs[j]).column_name)
views[view_name]["column_references"][column_name] = {}
views[view_name]["column_references"][column_name]["version"] = column_refs[j].version
obi_close_view_infos(all_views_p);
return views