992 lines
35 KiB
Cython
Executable File
992 lines
35 KiB
Cython
Executable File
#cython: language_level=3
|
|
|
|
|
|
cdef dict __VIEW_CLASS__= {}
|
|
|
|
|
|
from libc.stdlib cimport malloc
|
|
|
|
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
|
|
from obitools3.version import version
|
|
|
|
from ..capi.obiview cimport Alias_column_pair_p, \
|
|
obi_new_view, \
|
|
obi_open_view, \
|
|
obi_clone_view, \
|
|
obi_rollback_view, \
|
|
obi_save_and_close_view, \
|
|
obi_view_get_pointer_on_column_in_view, \
|
|
obi_view_delete_column, \
|
|
obi_view_create_column_alias, \
|
|
obi_view_write_comments, \
|
|
obi_delete_view, \
|
|
obi_view_formatted_infos, \
|
|
obi_view_formatted_infos_one_line
|
|
|
|
from ..capi.obidmscolumn cimport OBIDMS_column_p
|
|
from ..capi.obidms cimport OBIDMS_p
|
|
|
|
from obitools3.utils cimport tobytes, \
|
|
str2bytes, \
|
|
bytes2str, \
|
|
tostr, \
|
|
bytes2str_object, \
|
|
str2bytes_object, \
|
|
clean_empty_values_from_object, \
|
|
get_obitype
|
|
|
|
from ..object cimport OBIDeactivatedInstanceError
|
|
|
|
from obitools3.dms.view import typed_view
|
|
|
|
from ..capi.obitypes cimport is_a_DNA_seq, \
|
|
OBI_VOID, \
|
|
OBI_BOOL, \
|
|
OBI_CHAR, \
|
|
OBI_FLOAT, \
|
|
OBI_INT, \
|
|
OBI_QUAL, \
|
|
OBI_SEQ, \
|
|
OBI_STR
|
|
|
|
from ..capi.obidms cimport obi_import_view
|
|
|
|
from obitools3.format.tab import TabFormat
|
|
|
|
from cpython.exc cimport PyErr_CheckSignals
|
|
|
|
import importlib
|
|
import inspect
|
|
import pkgutil
|
|
import json
|
|
import sys
|
|
|
|
from libc.stdlib cimport free
|
|
|
|
|
|
cdef class View(OBIWrapper) :
|
|
|
|
cdef inline Obiview_p pointer(self) :
|
|
return <Obiview_p>(self._pointer)
|
|
|
|
@staticmethod
|
|
# TODO try cdef again
|
|
def get_view_class(bytes view_type):
|
|
global __VIEW_CLASS__
|
|
return __VIEW_CLASS__.get(view_type, View)
|
|
|
|
|
|
@staticmethod
|
|
def import_view(object dms_1, object dms_2, object view_name_1, object view_name_2): # TODO argument to import line by line to avoid huge AVL copy
|
|
if obi_import_view(tobytes(dms_1), tobytes(dms_2), tobytes(view_name_1), tobytes(view_name_2)) < 0 :
|
|
raise Exception("Error importing a view")
|
|
|
|
|
|
@staticmethod
|
|
def delete_view(DMS dms, object view_name):
|
|
if (obi_delete_view(dms.pointer(), tobytes(view_name)) < 0):
|
|
raise Exception("Error deleting a view")
|
|
|
|
|
|
@staticmethod
|
|
def new(DMS dms,
|
|
object view_name,
|
|
object comments={}):
|
|
|
|
cdef bytes view_name_b = tobytes(view_name)
|
|
cdef bytes comments_b
|
|
cdef str message
|
|
cdef void* pointer
|
|
|
|
cdef View view # @DuplicatedSignature
|
|
|
|
comments_b = str2bytes(json.dumps(bytes2str_object(comments)))
|
|
|
|
pointer = <void*>obi_new_view(<OBIDMS_p>dms._pointer,
|
|
view_name_b,
|
|
NULL,
|
|
NULL,
|
|
comments_b)
|
|
|
|
if pointer == NULL :
|
|
message = "Error : Cannot create view %s" % bytes2str(view_name_b)
|
|
raise RuntimeError(message)
|
|
|
|
view = OBIWrapper.new_wrapper(View, pointer)
|
|
view._dms = dms
|
|
dms.register(view)
|
|
|
|
return view
|
|
|
|
|
|
def clone(self,
|
|
object view_name,
|
|
object comments={}):
|
|
|
|
cdef bytes view_name_b = tobytes(view_name)
|
|
cdef bytes comments_b
|
|
cdef void* pointer
|
|
cdef View view
|
|
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
|
|
comments_b = str2bytes(json.dumps(bytes2str_object(dict(comments)))) # TODO hmmmmm function in View_comments class probably
|
|
|
|
pointer = <void*> obi_clone_view(self._dms.pointer(),
|
|
self.pointer(),
|
|
view_name_b,
|
|
NULL,
|
|
comments_b)
|
|
|
|
if pointer == NULL :
|
|
raise RuntimeError("Error : Cannot clone view %s into view %s"
|
|
% (str(self.name),
|
|
bytes2str(view_name_b))
|
|
)
|
|
|
|
view = OBIWrapper.new_wrapper(type(self), pointer)
|
|
view._dms = self._dms
|
|
self._dms.register(view)
|
|
|
|
return view
|
|
|
|
|
|
@staticmethod
|
|
def open(DMS dms, # @ReservedAssignment
|
|
object view_name):
|
|
|
|
cdef bytes view_name_b = tobytes(view_name)
|
|
cdef void* pointer
|
|
cdef View view
|
|
cdef type view_class
|
|
|
|
pointer = <void*> obi_open_view(dms.pointer(),
|
|
view_name_b)
|
|
|
|
if pointer == NULL :
|
|
raise RuntimeError("Error : Cannot open view %s" % bytes2str(view_name_b))
|
|
|
|
view_class = View.get_view_class((<Obiview_p>pointer).infos.view_type)
|
|
view = OBIWrapper.new_wrapper(view_class, pointer)
|
|
|
|
view._dms = dms
|
|
dms.register(view)
|
|
|
|
return view
|
|
|
|
|
|
def close(self):
|
|
cdef Obiview_p pointer = self.pointer()
|
|
|
|
if self.active() :
|
|
self._dms.unregister(self)
|
|
OBIWrapper.close(self)
|
|
if obi_save_and_close_view(pointer) < 0 :
|
|
raise Exception("Problem closing view %s" %
|
|
bytes2str(self.name))
|
|
|
|
|
|
@OBIWrapper.checkIsActive
|
|
def __repr__(self) :
|
|
cdef str s
|
|
cdef char* sc
|
|
cdef Obiview_p pointer = self.pointer()
|
|
sc = obi_view_formatted_infos(pointer, False)
|
|
s = bytes2str(sc)
|
|
free(sc)
|
|
return s
|
|
|
|
|
|
@OBIWrapper.checkIsActive
|
|
def repr_longformat(self) :
|
|
cdef str s
|
|
cdef char* sc
|
|
cdef Obiview_p pointer = self.pointer()
|
|
sc = obi_view_formatted_infos(pointer, True)
|
|
s = bytes2str(sc)
|
|
free(sc)
|
|
return s
|
|
|
|
|
|
cpdef print_to_output(self, object output, bint noprogressbar=False):
|
|
|
|
cdef int i
|
|
cdef Line entry
|
|
|
|
self.checkIsActive(self)
|
|
|
|
# Initialize the progress bar
|
|
if noprogressbar == False:
|
|
pb = ProgressBar(len(self))
|
|
else:
|
|
pb = None
|
|
i=0
|
|
for entry in self:
|
|
PyErr_CheckSignals()
|
|
if pb is not None:
|
|
pb(i)
|
|
output.write(entry.repr_bytes()+b"\n")
|
|
i+=1
|
|
if pb is not None:
|
|
pb(len(self), force=True)
|
|
print("", file=sys.stderr)
|
|
|
|
|
|
def keys(self):
|
|
|
|
cdef bytes col_alias
|
|
cdef int i
|
|
cdef Obiview_p pointer = self.pointer()
|
|
cdef int nb_column = pointer.infos.column_count
|
|
cdef Alias_column_pair_p column_p = pointer.infos.column_references
|
|
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
|
|
for i in range(nb_column) :
|
|
col_alias = column_p[i].alias
|
|
yield col_alias
|
|
|
|
|
|
def get_column(self,
|
|
object column_name):
|
|
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
|
|
return Column.open(self, tobytes(column_name))
|
|
|
|
|
|
def get_column_with_idx(self,
|
|
int column_idx):
|
|
|
|
cdef Obiview_p pointer = self.pointer()
|
|
cdef int nb_column = pointer.infos.column_count
|
|
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
|
|
if column_idx > nb_column :
|
|
raise IndexError(column_idx, "No column with this index")
|
|
|
|
return Column.open(self, pointer.infos.column_references[column_idx].alias)
|
|
|
|
|
|
cpdef delete_column(self,
|
|
object column_name,
|
|
bint delete_file=False) :
|
|
|
|
cdef bytes column_name_b = tobytes(column_name)
|
|
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
|
|
# Close the cython instance first
|
|
col = self[column_name_b]
|
|
col.close()
|
|
|
|
# Remove the column from the view which closes the C structure
|
|
if obi_view_delete_column(self.pointer(), column_name_b, delete_file) < 0 :
|
|
raise RollbackException("Problem deleting column %s from a view",
|
|
bytes2str(column_name_b), self)
|
|
|
|
|
|
cpdef rename_column(self,
|
|
object current_name,
|
|
object new_name):
|
|
|
|
cdef Column column
|
|
cdef bytes current_name_b = tobytes(current_name)
|
|
cdef bytes new_name_b = tobytes(new_name)
|
|
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
|
|
if (obi_view_create_column_alias(self.pointer(),
|
|
tobytes(current_name_b),
|
|
tobytes(new_name_b)) < 0) :
|
|
raise RollbackException("Problem in renaming column %s to %s" % (
|
|
bytes2str(current_name_b),
|
|
bytes2str(new_name_b)), self)
|
|
|
|
|
|
# TODO warning, not multithreading compliant
|
|
cpdef Column rewrite_column_with_diff_attributes(self,
|
|
object column_name,
|
|
obitype_t new_data_type=<obitype_t>OBI_VOID,
|
|
index_t new_nb_elements_per_line=0,
|
|
list new_elements_names=None,
|
|
bint rewrite_last_line=False) :
|
|
|
|
cdef Column old_column
|
|
cdef Column new_column
|
|
cdef index_t length = len(self)
|
|
cdef column_name_b = tobytes(column_name)
|
|
|
|
if rewrite_last_line is False:
|
|
length-=1
|
|
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
|
|
old_column = self.get_column(column_name_b)
|
|
|
|
if new_data_type == 0 :
|
|
new_data_type = old_column.data_type
|
|
|
|
if new_nb_elements_per_line == 0 :
|
|
new_nb_elements_per_line = old_column.nb_elements_per_line
|
|
|
|
if new_elements_names is None :
|
|
new_elements_names = old_column._elements_names
|
|
|
|
new_column = Column.new_column(self, old_column.pointer().header.name, new_data_type,
|
|
nb_elements_per_line=new_nb_elements_per_line, elements_names=new_elements_names,
|
|
dict_column=(new_nb_elements_per_line>1), comments=old_column.comments, alias=column_name_b+tobytes('___new___'))
|
|
|
|
switch_to_dict = not old_column.dict_column and new_nb_elements_per_line > 1
|
|
ori_key = old_column._elements_names[0]
|
|
|
|
for i in range(length) :
|
|
if switch_to_dict :
|
|
new_column[i] = {ori_key: old_column[i]}
|
|
else:
|
|
new_column[i] = old_column[i]
|
|
|
|
# Remove old column from view
|
|
self.delete_column(column_name_b, delete_file=True)
|
|
|
|
# Rename new
|
|
new_column.name = column_name_b
|
|
|
|
return new_column
|
|
|
|
|
|
cpdef Line_selection new_selection(self, list lines=None):
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
return Line_selection(self, lines)
|
|
|
|
|
|
@OBIWrapper.checkIsActive
|
|
def __iter__(self):
|
|
# Iteration on each line of all columns
|
|
|
|
# Declarations
|
|
cdef index_t line_nb
|
|
|
|
# Yield each line
|
|
for line_nb in range(self.line_count) :
|
|
yield Line(self, line_nb)
|
|
|
|
|
|
# TODO test time gain without
|
|
@OBIWrapper.checkIsActive
|
|
def __getitem__(self, object ref) :
|
|
if type(ref) == int :
|
|
return Line(self, ref)
|
|
else : # TODO assume str or bytes for optimization (discuss)
|
|
return self.get_column(ref) # note: slow, always better to keep column ref in a variable
|
|
|
|
|
|
@OBIWrapper.checkIsActive
|
|
def __setitem__(self, index_t idx, object item) :
|
|
cdef Column col
|
|
line = self[idx]
|
|
for k in item :
|
|
# If setting from another View Line and the column doesn't exist, create it based on the informations from the other View
|
|
# TODO use clone_column
|
|
if isinstance(item, Line) and tostr(k) not in self:
|
|
col = item.view[k]
|
|
Column.new_column(self,
|
|
col.original_name,
|
|
col.data_type_int,
|
|
nb_elements_per_line = col.nb_elements_per_line,
|
|
elements_names = col._elements_names,
|
|
dict_column = col.dict_column,
|
|
tuples = col.tuples,
|
|
to_eval = col.to_eval,
|
|
comments = col.comments,
|
|
alias=k
|
|
)
|
|
line[k] = item[k]
|
|
|
|
|
|
@OBIWrapper.checkIsActive
|
|
def __contains__(self, object column_name):
|
|
return (tobytes(column_name) in self.keys())
|
|
|
|
|
|
@OBIWrapper.checkIsActive
|
|
def __len__(self):
|
|
return(self.line_count)
|
|
|
|
|
|
@OBIWrapper.checkIsActive
|
|
def __str__(self) :
|
|
cdef Line line
|
|
cdef str to_print
|
|
to_print = ""
|
|
for line in self :
|
|
to_print = to_print + str(line) + "\n"
|
|
return to_print
|
|
|
|
|
|
@staticmethod
|
|
def _config_to_dict(dict config, str command_name, str command_line, list input_str=None, list input_dms_name=None, list input_view_name=None):
|
|
INVALID_KEYS = ["__root_config__", "module", "nocreatedms", "logger", "defaultdms", "inputview", "outputview", "log", "loglevel", "progress", "verbose"]
|
|
comments = {}
|
|
comments["obi"] = {k: config["obi"][k] for k in config["obi"] if k not in INVALID_KEYS}
|
|
comments[command_name] = config[command_name] # TODO or discuss update instead of nested dict
|
|
comments["command_line"] = command_line
|
|
if input_str is None and input_dms_name is None and input_view_name is None:
|
|
raise Exception("Can't build view configuration with None input") # TODO discuss
|
|
if (input_dms_name is not None and input_view_name is not None and len(input_dms_name) != len(input_view_name)) or \
|
|
(input_dms_name is None and input_view_name is not None) or \
|
|
(input_dms_name is not None and input_view_name is None):
|
|
raise Exception("Error building view configuration: there must be as many input DMS names as input view names") # TODO discuss
|
|
comments["input_dms_name"] = input_dms_name
|
|
comments["input_view_name"] = input_view_name
|
|
if input_str is None:
|
|
input_str = []
|
|
for i in range(len(input_view_name)):
|
|
input_str.append(tostr(input_dms_name[i])+"/"+tostr(input_view_name[i]))
|
|
comments["input_str"] = input_str
|
|
comments["version"] = version
|
|
return bytes2str_object(comments)
|
|
|
|
|
|
@staticmethod
|
|
def print_config(dict config, str command_name, str command_line, list input_str=None, list input_dms_name=None, list input_view_name=None):
|
|
config_dict = View._config_to_dict(config, command_name, command_line, \
|
|
input_str=input_str, input_dms_name=input_dms_name, input_view_name=input_view_name)
|
|
# Clean virtually empty values
|
|
config_dict = clean_empty_values_from_object(config_dict, exclude=[View_comments.KEEP_KEYS])
|
|
# Convert to json string
|
|
comments_json = json.dumps(config_dict)
|
|
return str2bytes(comments_json)
|
|
|
|
|
|
@staticmethod
|
|
def get_config_dict(dict config, str command_name, str command_line, list input_str=None, list input_dms_name=None, list input_view_name=None):
|
|
config_dict = View._config_to_dict(config, command_name, command_line, \
|
|
input_str=input_str, input_dms_name=input_dms_name, input_view_name=input_view_name)
|
|
# Clean virtually empty values
|
|
config_dict = clean_empty_values_from_object(config_dict, exclude=[View_comments.KEEP_KEYS])
|
|
return config_dict
|
|
|
|
|
|
@OBIWrapper.checkIsActive
|
|
def write_config(self, dict config, str command_name, str command_line, list input_str=None, list input_dms_name=None, list input_view_name=None):
|
|
self.comments = View._config_to_dict(config, command_name, command_line, \
|
|
input_str=input_str, input_dms_name=input_dms_name, input_view_name=input_view_name)
|
|
|
|
|
|
# command and view history DOT graph property getter in the form of a list (to remove duplicate elements afterwards)
|
|
@property
|
|
def dot_history_graph_list(self):
|
|
history = []
|
|
view_history = self.view_history
|
|
history.append(b"\tnode [shape=record]\n")
|
|
history.append(b"\tcompound=true\n")
|
|
for i in range(len(view_history)):
|
|
level = view_history[i]
|
|
for input in level:
|
|
# Command node
|
|
command = b"\""+level[input][b"command_line"]+b"\""
|
|
s = b"\t"
|
|
s+=command
|
|
s+=b" [style=filled, color=lightblue]\n"
|
|
history.append(s)
|
|
if len(input) > 1:
|
|
# Create invisible node
|
|
invi_node_name_no_quotes = b"_".join(input)
|
|
invi_node_name_quotes = b"\""+ invi_node_name_no_quotes + b"\""
|
|
s = b"\t"
|
|
s+=invi_node_name_quotes
|
|
s+=b" [width=0, style=invis, shape=point]\n"
|
|
history.append(s)
|
|
# Connect all inputs to the invisible node
|
|
for elt in input:
|
|
s = b"\t"
|
|
s = s+b"\""+elt+b"\""
|
|
s+=b" -> "
|
|
s+=invi_node_name_quotes
|
|
s+=b" [arrowhead=none]\n"
|
|
history.append(s)
|
|
to_connect_to_command = invi_node_name_no_quotes
|
|
else:
|
|
to_connect_to_command = input[0]
|
|
# Color node if input element is a taxonomy (to do for output nodes too if taxonomy history is to be recorded)
|
|
for elt in input:
|
|
if b"taxonomy" in elt:
|
|
s = b"\t"
|
|
s = s+b"\""+elt+b"\""
|
|
s+=b" [style=filled, color=navajowhite]\n"
|
|
history.append(s)
|
|
# Connect input to command
|
|
s = b"\t"
|
|
s = s+b"\""+to_connect_to_command+b"\""
|
|
s+=b" -> "
|
|
s+=command
|
|
s+=b"\n"
|
|
history.append(s)
|
|
# Connect command to output
|
|
s = b"\t"
|
|
s+=command
|
|
s+=b" -> "
|
|
s = s+b"\""+level[input][b"output"]+b"\""
|
|
s+=b"\n"
|
|
history.append(s)
|
|
return history
|
|
|
|
|
|
# command history DOT graph property getter in the form of a bytes string
|
|
@property
|
|
def dot_history_graph(self):
|
|
uniq_graph = []
|
|
for elt in self.dot_history_graph_list:
|
|
if elt not in uniq_graph:
|
|
uniq_graph.append(elt)
|
|
uniq_graph.insert(0, b"digraph \""+self.name+b"\" {\n")
|
|
uniq_graph.append(b"}")
|
|
return b"".join(uniq_graph)
|
|
|
|
|
|
# ASCII command history graph property getter
|
|
@property
|
|
def ascii_history(self):
|
|
arrow = b"\t|\n\tV\n"
|
|
s = b""
|
|
first = True
|
|
for level in self.view_history:
|
|
command_list = [level[input][b"command_line"] for input in level.keys()]
|
|
if not first:
|
|
s+=arrow
|
|
else:
|
|
first=False
|
|
for command in command_list:
|
|
s+=command
|
|
s+=b"\n"
|
|
return s
|
|
|
|
|
|
# bash command history property getter
|
|
@property
|
|
def bash_history(self):
|
|
s = b""
|
|
first = True
|
|
for level in self.view_history:
|
|
command_list = [level[input][b"command_line"] for input in level.keys()]
|
|
for command in command_list:
|
|
s+=b"obi "
|
|
s+=command
|
|
s+=b"\n"
|
|
return s
|
|
|
|
|
|
# View and command history property getter
|
|
@property
|
|
def view_history(self):
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
current_level = [self]
|
|
history = []
|
|
while current_level[0] is not None: # TODO not sure about robustness
|
|
top_level = []
|
|
level_dict = {}
|
|
for element in current_level:
|
|
if element is not None:
|
|
if element.comments[b"input_dms_name"] is not None :
|
|
for i in range(len(element.comments[b"input_dms_name"])) :
|
|
if b"/" not in element.comments[b"input_view_name"][i] and element.comments[b"input_view_name"][i] in element.dms \
|
|
and element.comments[b"input_dms_name"][i] == element.dms.name : # Same DMS and not a special element like a taxonomy and view was not deleted
|
|
top_level.append(element.dms[element.comments[b"input_view_name"][i]])
|
|
else:
|
|
top_level.append(None)
|
|
else:
|
|
top_level.append(None)
|
|
level_dict[tuple(element.comments[b"input_str"])] = {}
|
|
level_dict[tuple(element.comments[b"input_str"])][b"output"] = element.dms.name+b"/"+element.name
|
|
level_dict[tuple(element.comments[b"input_str"])][b"command_line"] = element.comments[b"command_line"]
|
|
history.insert(0, level_dict)
|
|
current_level = top_level
|
|
return history
|
|
|
|
|
|
# Width (column count) property getter
|
|
@property
|
|
def width(self):
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
return self.pointer().infos.column_count
|
|
|
|
|
|
# DMS property getter
|
|
@property
|
|
def dms(self):
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
return self._dms
|
|
|
|
|
|
# line_count property getter
|
|
@property
|
|
def line_count(self):
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
return self.pointer().infos.line_count
|
|
|
|
# read_only state property getter
|
|
@property
|
|
def read_only(self):
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
return self.pointer().read_only
|
|
|
|
# name property getter
|
|
@property
|
|
def name(self):
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
return <bytes> self.pointer().infos.name
|
|
|
|
# view type property getter
|
|
@property
|
|
def type(self): # @ReservedAssignment
|
|
if not self.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
return <bytes> self.pointer().infos.view_type
|
|
|
|
# comments property getter
|
|
@property
|
|
def comments(self):
|
|
return View_comments(self)
|
|
@comments.setter
|
|
def comments(self, object value):
|
|
View_comments(self, value)
|
|
|
|
|
|
cdef class View_comments(dict): # Not thread safe
|
|
|
|
KEEP_KEYS = [b"input_dms_name", b"input_view_name", b"input_str", "input_dms_name", "input_view_name", "input_str"]
|
|
|
|
def __init__(self, View view, value=None) :
|
|
if not view.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
self._view = view
|
|
if value is not None:
|
|
self.update(value) # TODO test and discuss not overwriting (could use replace bool)
|
|
self._update_from_file()
|
|
|
|
def _update_from_file(self):
|
|
cdef bytes comments_json
|
|
cdef str comments_json_str
|
|
cdef Obiview_p view_p
|
|
cdef View view
|
|
if not self._view.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
view = self._view
|
|
view_p = <Obiview_p>(view.pointer())
|
|
comments_json = <bytes> view_p.infos.comments
|
|
comments_json_str = bytes2str(comments_json)
|
|
comments_dict = json.loads(comments_json_str)
|
|
comments_dict = str2bytes_object(comments_dict)
|
|
super(View_comments, self).update(comments_dict)
|
|
|
|
def __getitem__(self, object key):
|
|
if not self._view.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
if type(key) == str:
|
|
key = str2bytes(key)
|
|
self._update_from_file()
|
|
return super(View_comments, self).__getitem__(key)
|
|
|
|
def __setitem__(self, object key, object value):
|
|
cdef Obiview_p view_p
|
|
cdef View view
|
|
|
|
if not self._view.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
|
|
view = self._view
|
|
view_p = <Obiview_p>(view.pointer())
|
|
|
|
# Remove virtually empty values from the object # TODO discuss
|
|
clean_empty_values_from_object(value, exclude=[self.KEEP_KEYS])
|
|
|
|
# If value is virtually empty, don't add it # TODO discuss
|
|
if (key not in self.KEEP_KEYS) and (value is None or len(value) == 0):
|
|
return
|
|
|
|
# Convert to bytes
|
|
if type(key) == str:
|
|
key = str2bytes(key)
|
|
value_bytes = str2bytes_object(value)
|
|
|
|
# Update dict with comments already written in file
|
|
self._update_from_file()
|
|
|
|
# Add new element # TODO don't overwrite?
|
|
super(View_comments, self).__setitem__(key, value_bytes)
|
|
|
|
# Convert to str because json library doens't like bytes
|
|
dict_str = {key:item for key,item in self.items()}
|
|
dict_str = bytes2str_object(dict_str)
|
|
|
|
# Convert to json string
|
|
comments_json = json.dumps(dict_str)
|
|
|
|
# Write new comments
|
|
if obi_view_write_comments(view_p, tobytes(comments_json)) < 0:
|
|
raise Exception("Could not write view comments, view:", view.name, "comments:", comments_json)
|
|
|
|
def update(self, value):
|
|
for k,v in value.items():
|
|
self[k] = v
|
|
|
|
def __contains__(self, key):
|
|
return super(View_comments, self).__contains__(tobytes(key))
|
|
|
|
def __str__(self):
|
|
return bytes2str(self._view.pointer().infos.comments)
|
|
|
|
|
|
cdef class Line :
|
|
|
|
def __init__(self, View view, index_t line_nb) :
|
|
self._index = line_nb
|
|
self._view = view
|
|
|
|
|
|
def __getitem__(self, object column_name) :
|
|
return (self._view).get_column(tobytes(column_name))[self._index]
|
|
|
|
|
|
def __setitem__(self, object column_name, object value): # TODO discuss
|
|
# TODO detect multiple elements (dict type)? put somewhere else? but more risky (in get)
|
|
# TODO OBI_QUAL ?
|
|
cdef type value_type
|
|
cdef obitype_t value_obitype
|
|
cdef bytes value_b
|
|
cdef bytes column_name_b
|
|
|
|
column_name_b = tobytes(column_name)
|
|
|
|
# TODO use functions in utils
|
|
|
|
#print(column_name, "value:", value, type(value))
|
|
|
|
if column_name_b not in self._view :
|
|
if value is None :
|
|
return # TODO discuss. This means that columns aren't created until an identifiable type is found
|
|
#raise RollbackException("Trying to create a column from a None value (can't guess type)", self)
|
|
value_obitype = get_obitype(value)
|
|
if value_obitype == OBI_VOID :
|
|
raise CantGuessTypeException(value)
|
|
#raise RollbackException("Could not guess the type of a value to create a new column", self._view)
|
|
|
|
Column.new_column(self._view, column_name_b, value_obitype)
|
|
|
|
(self._view).get_column(column_name_b).set_line(self._index, value)
|
|
|
|
|
|
def __iter__(self):
|
|
cdef bytes column_name
|
|
for column_name in (self._view).keys() :
|
|
yield column_name
|
|
|
|
|
|
def keys(self):
|
|
return self._view.keys()
|
|
|
|
|
|
def __contains__(self, object column_name):
|
|
return (tobytes(column_name) in self.keys())
|
|
|
|
|
|
def __repr__(self):
|
|
return bytes2str(self.repr_bytes())
|
|
|
|
|
|
cpdef repr_bytes(self):
|
|
formatter = TabFormat(header=False)
|
|
return formatter(self)
|
|
|
|
|
|
# View property getter
|
|
@property
|
|
def view(self):
|
|
return self._view
|
|
|
|
# index property getter
|
|
@property
|
|
def index(self):
|
|
return self._index
|
|
|
|
|
|
# cpdef dict get_view_infos(self, str view_name) :
|
|
#
|
|
# cdef Obiview_infos_p view_infos_p
|
|
# cdef dict view_infos_d
|
|
# cdef Alias_column_pair_p column_refs
|
|
# cdef int i, j
|
|
# cdef str column_name
|
|
#
|
|
# view_infos_p = obi_view_map_file(self._pointer,
|
|
# tobytes(view_name))
|
|
# view_infos_d = {}
|
|
# view_infos_d["name"] = bytes2str(view_infos_p.name)
|
|
# view_infos_d["comments"] = bytes2str(view_infos_p.comments)
|
|
# view_infos_d["view_type"] = bytes2str(view_infos_p.view_type)
|
|
# view_infos_d["column_count"] = <int> view_infos_p.column_count
|
|
# view_infos_d["line_count"] = <int> view_infos_p.line_count
|
|
# view_infos_d["created_from"] = bytes2str(view_infos_p.created_from)
|
|
# view_infos_d["creation_date"] = bytes2str(obi_format_date(view_infos_p.creation_date))
|
|
# if (view_infos_p.all_lines) :
|
|
# view_infos_d["line_selection"] = None
|
|
# else :
|
|
# view_infos_d["line_selection"] = {}
|
|
# view_infos_d["line_selection"]["column_name"] = bytes2str((view_infos_p.line_selection).column_name)
|
|
# view_infos_d["line_selection"]["version"] = <int> (view_infos_p.line_selection).version
|
|
# view_infos_d["column_references"] = {}
|
|
# column_references = view_infos_p.column_references
|
|
# for j in range(view_infos_d["column_count"]) :
|
|
# column_name = bytes2str((column_references[j]).alias)
|
|
# view_infos_d["column_references"][column_name] = {}
|
|
# view_infos_d["column_references"][column_name]["original_name"] = bytes2str((column_references[j]).column_refs.column_name)
|
|
# view_infos_d["column_references"][column_name]["version"] = (column_references[j]).column_refs.version
|
|
#
|
|
# obi_view_unmap_file(self._pointer, view_infos_p)
|
|
#
|
|
# return view_infos_d
|
|
|
|
|
|
|
|
cdef class Line_selection(list):
|
|
|
|
def __init__(self, View view, lines=None) :
|
|
if view._pointer == NULL:
|
|
raise Exception("Error: trying to create a line selection with an invalidated view")
|
|
self._view = view
|
|
self._view_name = view.name
|
|
|
|
if lines is not None:
|
|
self.extend(lines)
|
|
|
|
|
|
def extend(self, iterable):
|
|
cdef index_t i
|
|
cdef index_t max_i = self._view.line_count
|
|
|
|
for i in iterable: # TODO this is already checked in C
|
|
if i > max_i:
|
|
raise RuntimeError("Error: trying to select line %d beyond the line count %d of view %s" %
|
|
(i,
|
|
max_i,
|
|
self._view_name)
|
|
)
|
|
list.append(self,i)
|
|
|
|
|
|
def append(self, index_t idx) :
|
|
if idx >= self._view.line_count :
|
|
raise IndexError("Error: trying to select line %d beyond the line count %d of view %s" %
|
|
(idx,
|
|
self._view.line_count,
|
|
bytes2str(self.name))
|
|
)
|
|
list.append(self,idx)
|
|
|
|
|
|
cdef index_t* __build_binary_list__(self):
|
|
cdef index_t* line_selection_p = NULL
|
|
cdef int i
|
|
cdef size_t l_selection = len(self)
|
|
|
|
line_selection_p = <index_t*> malloc((l_selection + 1) * sizeof(index_t)) # +1 for the -1 flagging the end of the array
|
|
for i in range(l_selection) :
|
|
line_selection_p[i] = self[i]
|
|
line_selection_p[l_selection] = -1 # flagging the end of the array
|
|
|
|
return line_selection_p
|
|
|
|
|
|
cpdef View materialize(self,
|
|
object view_name,
|
|
object comments={}):
|
|
|
|
cdef bytes view_name_b = tobytes(view_name)
|
|
cdef bytes comments_b
|
|
cdef Obiview_p pointer
|
|
cdef View view
|
|
|
|
if not self._view.active() :
|
|
raise OBIDeactivatedInstanceError()
|
|
|
|
comments_b = str2bytes(json.dumps(bytes2str_object(comments)))
|
|
|
|
pointer = obi_clone_view(self._view._dms.pointer(),
|
|
self._view.pointer(),
|
|
view_name_b,
|
|
self.__build_binary_list__(),
|
|
comments_b)
|
|
|
|
if pointer == NULL :
|
|
raise RuntimeError("Error : Cannot clone view %s into view %s with new line selection"
|
|
% (str(self._view.name),
|
|
bytes2str(view_name_b))
|
|
)
|
|
|
|
view = OBIWrapper.new_wrapper(type(self._view), pointer)
|
|
view._dms = self._view._dms
|
|
view._dms.register(view)
|
|
|
|
return view
|
|
|
|
|
|
#############################################################
|
|
|
|
|
|
class RollbackException(Exception):
|
|
def __init__(self, message, *views):
|
|
super(RollbackException, self).__init__(message)
|
|
for i in range(len(views)):
|
|
view = <View>(views[i])
|
|
if obi_rollback_view(<Obiview_p>(view.pointer())) < 0 :
|
|
raise Exception("Error rollbacking view")
|
|
if view.active() :
|
|
view._dms.unregister(view)
|
|
OBIWrapper.close(view)
|
|
|
|
|
|
class CantGuessTypeException(Exception):
|
|
def __init__(self, object value) :
|
|
raise Exception("Can't guess type to set value:", value)
|
|
|
|
#############################################################
|
|
|
|
|
|
cdef register_view_class(bytes view_type_name,
|
|
type view_class):
|
|
'''
|
|
Each subclass of `dms.view` needs to be registered after its declaration
|
|
'''
|
|
global __VIEW_CLASS__
|
|
|
|
assert issubclass(view_class, View)
|
|
|
|
__VIEW_CLASS__[view_type_name] = view_class
|
|
|
|
|
|
cdef register_all_view_classes() :
|
|
|
|
x = list(pkgutil.walk_packages(typed_view.__path__, prefix="obitools3.dms.view.typed_view."))
|
|
all_modules = [importlib.import_module(a[1]) for a in x]
|
|
for mod in all_modules :
|
|
getattr(mod, 'register_class')()
|
|
|
|
|
|
register_all_view_classes()
|