163 lines
4.3 KiB
Cython
163 lines
4.3 KiB
Cython
#cython: language_level=3
|
|
|
|
|
|
from libc.stdlib cimport free
|
|
from cpython.list cimport PyList_Size
|
|
|
|
from .capi.obidms cimport obi_open_dms, \
|
|
obi_create_dms, \
|
|
obi_close_dms, \
|
|
obi_dms_exists, \
|
|
obi_dms_get_full_path
|
|
|
|
from .capi.obitypes cimport const_char_p
|
|
|
|
from obitools3.utils cimport bytes2str, \
|
|
str2bytes, \
|
|
tobytes, \
|
|
tostr
|
|
|
|
from .object cimport OBIObjectClosedInstance
|
|
|
|
from pathlib import Path
|
|
|
|
from .view import view
|
|
|
|
|
|
cdef class DMS(OBIWrapper):
|
|
|
|
cdef inline OBIDMS_p pointer(self):
|
|
return <OBIDMS_p>(self._pointer)
|
|
|
|
@staticmethod
|
|
def open_or_new(object dms_name) :
|
|
cdef OBIDMS_p pointer
|
|
cdef DMS dms
|
|
cdef bytes dms_name_b = tobytes(dms_name)
|
|
if DMS.exists(dms_name_b) :
|
|
pointer = obi_open_dms(<const_char_p> dms_name_b)
|
|
else :
|
|
pointer = obi_create_dms(<const_char_p> dms_name_b)
|
|
if pointer == NULL :
|
|
raise Exception("Failed opening or creating an OBIDMS")
|
|
dms = OBIWrapper.new_wrapper(DMS, pointer)
|
|
return dms
|
|
|
|
@staticmethod
|
|
def exists(object dms_name):
|
|
cdef bytes dms_name_b = tobytes(dms_name)
|
|
cdef int rep
|
|
rep = obi_dms_exists(dms_name_b)
|
|
if rep < 0 :
|
|
raise RuntimeError("Error checking if a DMS exists")
|
|
else :
|
|
return bool(rep)
|
|
|
|
@staticmethod
|
|
def new(object dms_name) :
|
|
cdef OBIDMS_p pointer
|
|
cdef DMS dms
|
|
cdef bytes dms_name_b = tobytes(dms_name)
|
|
pointer = obi_create_dms(<const_char_p> dms_name_b)
|
|
if pointer == NULL :
|
|
raise Exception("Failed creating an OBIDMS")
|
|
dms = OBIWrapper.new_wrapper(DMS, pointer)
|
|
return dms
|
|
|
|
|
|
@staticmethod
|
|
def open(object dms_name) :
|
|
cdef OBIDMS_p pointer
|
|
cdef DMS dms
|
|
cdef bytes dms_name_b = tobytes(dms_name)
|
|
pointer = obi_open_dms(<const_char_p> dms_name_b)
|
|
if pointer == NULL :
|
|
raise Exception("Failed opening an OBIDMS")
|
|
dms = OBIWrapper.new_wrapper(DMS, pointer)
|
|
return dms
|
|
|
|
|
|
def close(self) :
|
|
'''
|
|
Closes the DMS instance and free the associated memory
|
|
|
|
The `close` method is automatically called by the object destructor.
|
|
'''
|
|
cdef OBIDMS_p pointer = self.pointer()
|
|
|
|
if self.active() :
|
|
OBIWrapper.close(self)
|
|
if (obi_close_dms(pointer)) < 0 :
|
|
raise Exception("Problem closing an OBIDMS")
|
|
|
|
|
|
# name property getter
|
|
@property
|
|
def name(self) :
|
|
'''
|
|
Returns the name of the DMS instance
|
|
|
|
@rtype: bytes
|
|
'''
|
|
return <bytes> self.pointer().dms_name
|
|
|
|
|
|
def keys(self) :
|
|
|
|
cdef const_char_p path = obi_dms_get_full_path(self.pointer(), b"VIEWS")
|
|
|
|
if path == NULL:
|
|
raise RuntimeError("Cannot retrieve the view database path")
|
|
|
|
p = Path(bytes2str(path))
|
|
|
|
free(path)
|
|
|
|
for v in p.glob("*.obiview") :
|
|
yield str2bytes(v.stem)
|
|
|
|
|
|
def values(self) :
|
|
cdef bytes view_name
|
|
for view_name in self.keys():
|
|
yield self.get_view(view_name)
|
|
|
|
|
|
def items(self) :
|
|
cdef bytes view_name
|
|
for view_name in self.keys():
|
|
yield (view_name, self.get_view(view_name))
|
|
|
|
|
|
def __contains__(self, key) :
|
|
|
|
cdef str key_s = tostr(key)
|
|
|
|
cdef const_char_p path = obi_dms_get_full_path(self.pointer(), b"VIEWS")
|
|
p = Path(bytes2str(path),key_s)
|
|
|
|
free(path)
|
|
|
|
return p.with_suffix(".obiview").is_file()
|
|
|
|
|
|
cpdef int view_count(self) :
|
|
return PyList_Size(list(self.keys()))
|
|
|
|
|
|
def __len__(self) :
|
|
return self.view_count()
|
|
|
|
|
|
def __getitem__(self, object view_name):
|
|
return self.get_view(view_name)
|
|
|
|
|
|
def __iter__(self) :
|
|
return self.keys()
|
|
|
|
|
|
def get_view(self, object view_name) :
|
|
return view.View.open(self, view_name)
|
|
|
|
|