Cython API: compiling but not working

This commit is contained in:
Celine Mercier
2017-03-06 16:07:02 +01:00
parent 778acc48cd
commit 381194194c
19 changed files with 1404 additions and 1531 deletions

View File

@ -1,5 +1,5 @@
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
from obitools3.obidms._obidms import OBIDMS # TODO cimport doesn't work
from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
from obitools3.utils cimport bytes2str
import time

View File

@ -1,5 +1,6 @@
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
from obitools3.obidms._obidms import OBIDMS, OBIView, OBIView_line_selection # TODO cimport doesn't work
from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
#, OBIView, OBIView_line_selection # TODO cimport doesn't work
from functools import reduce
import time
@ -56,34 +57,34 @@ def run(config):
d = OBIDMS(config['obi']['defaultdms'])
# Open input view 1
iview = d.open_view(config['obi']['inputview'])
# Initialize the progress bar
pb = ProgressBar(len(iview), config, seconde=5)
# Apply filter
selection = OBIView_line_selection(iview)
for i in range(len(iview)) :
pb(i)
line = iview[i]
loc_env = {'sequence': line, 'line': line} # TODO add taxonomy
good = (reduce(lambda bint x, bint y: x and y,
(bool(eval(p, loc_env, line))
for p in config['grep']['predicates']), True))
if good :
selection.append(i)
# Create output view with the line selection
oview = d.clone_view_with_line_selection(config['obi']['outputview'], selection, comments="obi grep: "+str(config['grep']['predicates'])+"\n")
#print("\n")
#print(repr(oview))
iview.close()
oview.close()
# iview = d.open_view(config['obi']['inputview'])
#
# # Initialize the progress bar
# pb = ProgressBar(len(iview), config, seconde=5)
#
# # Apply filter
# selection = OBIView_line_selection(iview)
# for i in range(len(iview)) :
# pb(i)
# line = iview[i]
#
# loc_env = {'sequence': line, 'line': line} # TODO add taxonomy
#
# good = (reduce(lambda bint x, bint y: x and y,
# (bool(eval(p, loc_env, line))
# for p in config['grep']['predicates']), True))
#
# if good :
# selection.append(i)
#
# # Create output view with the line selection
# oview = d.clone_view_with_line_selection(config['obi']['outputview'], selection, comments="obi grep: "+str(config['grep']['predicates'])+"\n")
#
# #print("\n")
# #print(repr(oview))
#
# iview.close()
# oview.close()
d.close()

View File

@ -2,7 +2,7 @@ from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
from obitools3.files.universalopener cimport uopen
from obitools3.parsers.fasta import fastaIterator
from obitools3.parsers.fastq import fastqIterator
from obitools3.obidms._obidms import OBIDMS
from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
import time
@ -109,22 +109,22 @@ def run(config):
d = OBIDMS(config['obi']['defaultdms'])
# Create view
view = d.new_view(config['import']['destview'], view_type=view_type, quality_column=get_quality)
i = 0
for seq in iseq:
pb(i)
view[i].id = seq['id']
view[i].definition = seq['definition']
view[i].nuc_seq = seq['sequence']
if get_quality :
view[i].quality = seq['quality']
for tag in seq['tags'] :
view[i][tag] = seq['tags'][tag]
i+=1
#print(view.__repr__())
view.close()
# view = d.new_view(config['import']['destview'], view_type=view_type, quality_column=get_quality)
#
# i = 0
# for seq in iseq:
# pb(i)
# view[i].id = seq['id']
# view[i].definition = seq['definition']
# view[i].nuc_seq = seq['sequence']
# if get_quality :
# view[i].quality = seq['quality']
# for tag in seq['tags'] :
# view[i][tag] = seq['tags'][tag]
# i+=1
#
# #print(view.__repr__())
#
# view.close()
d.close()

View File

@ -1,11 +1,11 @@
#cython: language_level=3
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
from obitools3.obidms._obidms cimport OBIDMS # TODO cimport doesn't work
from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
from obitools3.utils cimport str2bytes
from obitools3.obidms.capi.obialign cimport obi_lcs_align_one_column, \
obi_lcs_align_two_columns
from obitools3.dms.capi.obialign cimport obi_lcs_align_one_column, \
obi_lcs_align_two_columns
import time
@ -155,82 +155,82 @@ def addOptions(parser):
help="Number of threads to use for the computation. Default: one.")
cpdef align(str dms_n,
str input_view_1_n, str output_view_n,
str input_view_2_n="",
str input_column_1_n="", str input_column_2_n="",
str input_elt_1_n="", str input_elt_2_n="",
str id_column_1_n="", str id_column_2_n="",
double threshold=0.0, bint normalize=True,
int reference=0, bint similarity_mode=True,
bint print_seq=False, bint print_count=False,
comments="",
int thread_count=1) :
cdef OBIDMS d
d = OBIDMS(dms_n)
if input_view_2_n == "" and input_column_2_n == "" :
if obi_lcs_align_one_column(d._pointer, \
str2bytes(input_view_1_n), \
str2bytes(input_column_1_n), \
str2bytes(input_elt_1_n), \
str2bytes(id_column_1_n), \
str2bytes(output_view_n), \
str2bytes(comments), \
print_seq, \
print_count, \
threshold, normalize, reference, similarity_mode,
thread_count) < 0 :
raise Exception("Error aligning sequences")
else :
if obi_lcs_align_two_columns(d._pointer, \
str2bytes(input_view_1_n), \
str2bytes(input_view_2_n), \
str2bytes(input_column_1_n), \
str2bytes(input_column_2_n), \
str2bytes(input_elt_1_n), \
str2bytes(input_elt_2_n), \
str2bytes(id_column_1_n), \
str2bytes(id_column_2_n), \
str2bytes(output_view_n), \
str2bytes(comments), \
print_seq, \
print_count, \
threshold, normalize, reference, similarity_mode) < 0 :
raise Exception("Error aligning sequences")
d.close()
def run(config):
# TODO: Build formatted comments with all parameters etc
comments = "Obi align"
# Call cython alignment function
align(config['obi']['defaultdms'], \
config['obi']['inputview1'], \
config['obi']['outputview'], \
input_view_2_n = config['obi']['inputview2'], \
input_column_1_n = config['obi']['inputcolumn1'], \
input_column_2_n = config['obi']['inputcolumn2'], \
input_elt_1_n = config['obi']['inputelement1'], \
input_elt_2_n = config['obi']['inputelement2'], \
id_column_1_n = config['obi']['idcolumn1'], \
id_column_2_n = config['obi']['idcolumn2'], \
threshold = config['align']['threshold'], \
normalize = config['align']['normalize'], \
reference = config['align']['reflength'], \
similarity_mode = config['align']['similarity'], \
print_seq = config['align']['printseq'], \
print_count = config['align']['printcount'], \
comments = comments, \
thread_count = config['align']['threadcount'])
print("Done.")
# cpdef align(str dms_n,
# str input_view_1_n, str output_view_n,
# str input_view_2_n="",
# str input_column_1_n="", str input_column_2_n="",
# str input_elt_1_n="", str input_elt_2_n="",
# str id_column_1_n="", str id_column_2_n="",
# double threshold=0.0, bint normalize=True,
# int reference=0, bint similarity_mode=True,
# bint print_seq=False, bint print_count=False,
# comments="",
# int thread_count=1) :
#
# cdef OBIDMS d
# d = OBIDMS(dms_n)
#
# if input_view_2_n == "" and input_column_2_n == "" :
# if obi_lcs_align_one_column(d._pointer, \
# str2bytes(input_view_1_n), \
# str2bytes(input_column_1_n), \
# str2bytes(input_elt_1_n), \
# str2bytes(id_column_1_n), \
# str2bytes(output_view_n), \
# str2bytes(comments), \
# print_seq, \
# print_count, \
# threshold, normalize, reference, similarity_mode,
# thread_count) < 0 :
# raise Exception("Error aligning sequences")
# else :
# if obi_lcs_align_two_columns(d._pointer, \
# str2bytes(input_view_1_n), \
# str2bytes(input_view_2_n), \
# str2bytes(input_column_1_n), \
# str2bytes(input_column_2_n), \
# str2bytes(input_elt_1_n), \
# str2bytes(input_elt_2_n), \
# str2bytes(id_column_1_n), \
# str2bytes(id_column_2_n), \
# str2bytes(output_view_n), \
# str2bytes(comments), \
# print_seq, \
# print_count, \
# threshold, normalize, reference, similarity_mode) < 0 :
# raise Exception("Error aligning sequences")
#
# d.close()
#
#
# def run(config):
#
# # TODO: Build formatted comments with all parameters etc
# comments = "Obi align"
#
# # Call cython alignment function
# align(config['obi']['defaultdms'], \
# config['obi']['inputview1'], \
# config['obi']['outputview'], \
# input_view_2_n = config['obi']['inputview2'], \
# input_column_1_n = config['obi']['inputcolumn1'], \
# input_column_2_n = config['obi']['inputcolumn2'], \
# input_elt_1_n = config['obi']['inputelement1'], \
# input_elt_2_n = config['obi']['inputelement2'], \
# id_column_1_n = config['obi']['idcolumn1'], \
# id_column_2_n = config['obi']['idcolumn2'], \
# threshold = config['align']['threshold'], \
# normalize = config['align']['normalize'], \
# reference = config['align']['reflength'], \
# similarity_mode = config['align']['similarity'], \
# print_seq = config['align']['printseq'], \
# print_count = config['align']['printcount'], \
# comments = comments, \
# thread_count = config['align']['threadcount'])
#
# print("Done.")
#
#
#
#
#

View File

@ -1,383 +1,383 @@
from obitools3.apps.progress cimport ProgressBar # TODO I absolutely don't understand why it doesn't work without that line
from obitools3.obidms._obidms import OBIDMS # TODO cimport doesn't work
from obitools3.obidms._obidms import OBIView_line_selection
from obitools3.utils cimport str2bytes
import shutil
import string
import random
VIEW_TYPES = ["", "NUC_SEQS_VIEW"]
COL_TYPES = ["OBI_BOOL", "OBI_CHAR", "OBI_FLOAT", "OBI_INT", "OBI_SEQ", "OBI_STR"]
NUC_SEQUENCE_COLUMN = "NUC_SEQ"
ID_COLUMN = "ID"
DEFINITION_COLUMN = "DEFINITION"
QUALITY_COLUMN = "QUALITY"
SPECIAL_COLUMNS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN]
NAME_MAX_LEN = 200
COL_COMMENTS_MAX_LEN = 2048
MAX_INT = 2147483647 # used to generate random float values
__title__="Tests if the obitools are working properly"
default_config = {
}
def random_length(max_len):
return random.randint(1, max_len)
def random_bool(config):
return random.choice([True, False])
def random_char(config):
return random.choice(string.ascii_lowercase)
def random_float(config):
return random.randint(0, MAX_INT) + random.random()
def random_int(config):
return random.randint(0, config['test']['maxlinenb'])
def random_seq(config):
return str2bytes(''.join(random.choice(['a','t','g','c']) for i in range(random_length(config['test']['seqmaxlen']))))
def random_str(config):
return random_str_with_max_len(config['test']['strmaxlen'])
def random_str_with_max_len(max_len):
return ''.join(random.choice(string.ascii_lowercase) for i in range(random_length(max_len)))
def random_column(infos):
return random.choice(sorted(list(infos['view'].columns)))
def random_unique_name(infos):
name = ""
while name == "" or name in infos['unique_names'] :
name = random_str_with_max_len(NAME_MAX_LEN)
infos['unique_names'].append(name)
return name
def random_unique_element_name(config, infos):
name = ""
while name == "" or name in infos['unique_names'] :
name = random_str_with_max_len(config['test']['elt_name_max_len'])
infos['unique_names'].append(name)
return name
def print_test(config, sentence):
if config['test']['verbose'] :
print(sentence)
def test_set_and_get(config, infos):
print_test(config, ">>> Set and get test")
col_name = random_column(infos)
col = infos['view'][col_name]
element_names = col.elements_names
data_type = col.data_type
if data_type == "OBI_QUAL" :
print_test(config, "-")
return
idx = random_int(config)
value = infos['random_generator'][data_type](config)
if col.nb_elements_per_line > 1 :
elt = random.choice(element_names)
col[idx][elt] = value
assert col[idx][elt] == value, "Set value != gotten value "+str(col[idx][elt])+" != "+str(value)
else:
col[idx] = value
assert col[idx] == value, "Set value != gotten value "+str(col[idx])+" != "+str(value)
print_test(config, ">>> Set and get test OK")
def test_add_col(config, infos):
print_test(config, ">>> Add column test")
#existing_col = random_bool(config) # TODO doesn't work because of line count problem. See obiview.c line 1737
#if existing_col and infos["view_names"] != [] :
# random_view = infos['dms'].open_view(random.choice(infos["view_names"]))
# random_column = random_view[random.choice(sorted(list(random_view.columns))]
# random_column_refs = random_column.refs
# if random_column_refs['name'] in infos['view'] :
# alias = random_unique_name(infos)
# else :
# alias = ''
# infos['view'].add_column(random_column_refs['name'], version_number=random_column_refs['version'], alias=alias, create=False)
# random_view.close()
#else :
create_random_column(config, infos)
print_test(config, ">>> Add column test OK")
def test_delete_col(config, infos):
print_test(config, ">>> Delete column test")
if len(list(infos['view'].columns)) <= 1 :
print_test(config, "-")
return
col_name = random_column(infos)
if col_name in SPECIAL_COLUMNS :
print_test(config, "-")
return
infos['view'].delete_column(col_name)
print_test(config, ">>> Delete column test OK")
def test_col_alias(config, infos):
print_test(config, ">>> Changing column alias test")
col_name = random_column(infos)
if col_name in SPECIAL_COLUMNS :
print_test(config, "-")
return
infos['view'][col_name].alias = random_unique_name(infos)
print_test(config, ">>> Changing column alias test OK")
def test_new_view(config, infos):
print_test(config, ">>> New view test")
random_new_view(config, infos)
print_test(config, ">>> New view test OK")
def random_test(config, infos):
return random.choice(infos['tests'])(config, infos)
def random_view_type():
return random.choice(VIEW_TYPES)
def random_col_type():
return random.choice(COL_TYPES)
def fill_column(config, infos, col) :
data_type = col.data_type
element_names = col.elements_names
if len(element_names) > 1 :
for i in range(random_int(config)) :
for j in range(len(element_names)) :
col[i][element_names[j]] = infos['random_generator'][data_type](config)
else :
for i in range(random_int(config)) :
col[i] = infos['random_generator'][data_type](config)
def create_random_column(config, infos) :
alias = random.choice(['', random_unique_name(infos)])
nb_elements_per_line=random.randint(1, config['test']['maxelts'])
elements_names = []
for i in range(nb_elements_per_line) :
elements_names.append(random_unique_element_name(config, infos))
elements_names = random.choice([None, elements_names])
name = random_unique_name(infos)
infos['view'].add_column(name,
alias=alias,
type=random_col_type(),
nb_elements_per_line=nb_elements_per_line,
elements_names=elements_names,
indexer_name=random.choice(['', random_unique_name(infos)]),
comments=random_str_with_max_len(COL_COMMENTS_MAX_LEN),
create=True
)
if alias != '' :
return infos['view'][alias]
else :
return infos['view'][name]
def fill_view(config, infos):
for i in range(random.randint(1, config['test']['maxinicolcount'])) :
col = create_random_column(config, infos)
fill_column(config, infos, col)
def random_new_view(config, infos, first=False):
v_to_clone = None
line_selection = None
quality_col = False # TODO
if not first:
infos['view_names'].append(infos['view'].name)
infos['view'].close()
v_to_clone = infos['dms'].open_view(random.choice(infos["view_names"]))
v_type = ""
print_test(config, "View to clone: ")
print_test(config, repr(v_to_clone))
create_line_selection = random_bool(config)
if create_line_selection and v_to_clone.line_count > 0:
print_test(config, "New view with new line selection.")
line_selection = OBIView_line_selection(v_to_clone)
for i in range(random.randint(1, v_to_clone.line_count)) :
line_selection.append(random.randint(0, v_to_clone.line_count-1))
#print_test(config, "New line selection: "+str(line_selection))
else :
v_type = random_view_type()
if line_selection is not None :
infos['view'] = infos['dms'].clone_view_with_line_selection(random_unique_name(infos), line_selection, comments=random_str_with_max_len(config['test']['commentsmaxlen']))
elif v_to_clone is not None :
infos['view'] = infos['dms'].clone_view(random_unique_name(infos), v_to_clone, comments=random_str_with_max_len(config['test']['commentsmaxlen']))
else :
infos['view'] = infos['dms'].new_view(random_unique_name(infos), view_type=v_type, comments=random_str_with_max_len(config['test']['commentsmaxlen']), quality_column=quality_col)
print_test(config, repr(infos['view']))
if v_to_clone is not None :
if line_selection is None:
assert v_to_clone.line_count == infos['view'].line_count, "New view and cloned view don't have the same line count"
else :
assert len(line_selection) == infos['view'].line_count, "New view with new line selection does not have the right line count"
v_to_clone.close()
if first :
fill_view(config, infos)
def create_test_obidms(config, infos):
infos['dms'] = OBIDMS(config['obi']['defaultdms'])
def ini_dms_and_first_view(config, infos):
create_test_obidms(config, infos)
random_new_view(config, infos, first=True)
infos['view_names'] = []
def addOptions(parser):
# TODO put this common group somewhere else but I don't know where
group=parser.add_argument_group('DMS and view options')
group.add_argument('--default-dms','-d',
action="store", dest="obi:defaultdms",
metavar='<DMS NAME>',
default="/tmp/test_dms",
type=str,
help="Name of the default DMS for reading and writing data. "
"Default: /tmp/test_dms")
group=parser.add_argument_group('obi test specific options')
group.add_argument('--nb_tests','-n',
action="store", dest="test:nbtests",
metavar='<NB_TESTS>',
default=1000,
type=int,
help="Number of tests to carry out. "
"Default: 1000")
group.add_argument('--seq_max_len','-s',
action="store", dest="test:seqmaxlen",
metavar='<SEQ_MAX_LEN>',
default=200,
type=int,
help="Maximum length of DNA sequences. "
"Default: 200")
group.add_argument('--str_max_len','-t',
action="store", dest="test:strmaxlen",
metavar='<STR_MAX_LEN>',
default=200,
type=int,
help="Maximum length of character strings. "
"Default: 200")
group.add_argument('--comments_max_len','-c',
action="store", dest="test:commentsmaxlen",
metavar='<COMMENTS_MAX_LEN>',
default=10000,
type=int,
help="Maximum length of view comments. "
"Default: 10000")
group.add_argument('--max_ini_col_count','-o',
action="store", dest="test:maxinicolcount",
metavar='<MAX_INI_COL_COUNT>',
default=10,
type=int,
help="Maximum number of columns in the initial view. "
"Default: 10")
group.add_argument('--max_line_nb','-l',
action="store", dest="test:maxlinenb",
metavar='<MAX_LINE_NB>',
default=10000,
type=int,
help="Maximum number of lines in a column. "
"Default: 10000")
group.add_argument('--max_elts_per_line','-e',
action="store", dest="test:maxelts",
metavar='<MAX_ELTS_PER_LINE>',
default=20,
type=int,
help="Maximum number of elements per line in a column. "
"Default: 20")
group.add_argument('--verbose','-v',
action="store_true", dest="test:verbose",
default=False,
help="Print the tests. "
"Default: Don't print the tests")
group.add_argument('--seed','-g',
action="store", dest="test:seed",
metavar='<SEED>',
default=None,
help="Seed (use for reproducible tests). "
"Default: Seed is determined by Python")
def run(config):
if 'seed' in config['test'] :
random.seed(config['test']['seed'])
infos = {'dms': None,
'view': None,
'view_names': None,
'unique_names': [],
'random_generator': {"OBI_BOOL": random_bool, "OBI_CHAR": random_char, "OBI_FLOAT": random_float, "OBI_INT": random_int, "OBI_SEQ": random_seq, "OBI_STR": random_str},
'tests': [test_set_and_get, test_add_col, test_delete_col, test_col_alias, test_new_view]
}
config['test']['elt_name_max_len'] = int((COL_COMMENTS_MAX_LEN - config['test']['maxelts']) / config['test']['maxelts'])
print("Initializing the DMS and the first view...")
shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True)
ini_dms_and_first_view(config, infos)
print_test(config, repr(infos['view']))
i = 0
for t in range(config['test']['nbtests']):
random_test(config, infos)
print_test(config, repr(infos['view']))
i+=1
if (i%(config['test']['nbtests']/10)) == 0 :
print("Testing......"+str(i*100/config['test']['nbtests'])+"%")
#print(infos)
infos['view'].close()
infos['dms'].close()
shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True)
print("Done.")
from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
# from obitools3.obidms._obidms import OBIView_line_selection
# from obitools3.utils cimport str2bytes
#
# import shutil
# import string
# import random
#
#
# VIEW_TYPES = ["", "NUC_SEQS_VIEW"]
# COL_TYPES = ["OBI_BOOL", "OBI_CHAR", "OBI_FLOAT", "OBI_INT", "OBI_SEQ", "OBI_STR"]
# NUC_SEQUENCE_COLUMN = "NUC_SEQ"
# ID_COLUMN = "ID"
# DEFINITION_COLUMN = "DEFINITION"
# QUALITY_COLUMN = "QUALITY"
# SPECIAL_COLUMNS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN]
#
#
# NAME_MAX_LEN = 200
# COL_COMMENTS_MAX_LEN = 2048
# MAX_INT = 2147483647 # used to generate random float values
#
#
# __title__="Tests if the obitools are working properly"
#
#
# default_config = {
# }
#
#
# def random_length(max_len):
# return random.randint(1, max_len)
#
#
# def random_bool(config):
# return random.choice([True, False])
#
#
# def random_char(config):
# return random.choice(string.ascii_lowercase)
#
#
# def random_float(config):
# return random.randint(0, MAX_INT) + random.random()
#
#
# def random_int(config):
# return random.randint(0, config['test']['maxlinenb'])
#
#
# def random_seq(config):
# return str2bytes(''.join(random.choice(['a','t','g','c']) for i in range(random_length(config['test']['seqmaxlen']))))
#
#
# def random_str(config):
# return random_str_with_max_len(config['test']['strmaxlen'])
#
#
# def random_str_with_max_len(max_len):
# return ''.join(random.choice(string.ascii_lowercase) for i in range(random_length(max_len)))
#
#
# def random_column(infos):
# return random.choice(sorted(list(infos['view'].columns)))
#
#
# def random_unique_name(infos):
# name = ""
# while name == "" or name in infos['unique_names'] :
# name = random_str_with_max_len(NAME_MAX_LEN)
# infos['unique_names'].append(name)
# return name
#
#
# def random_unique_element_name(config, infos):
# name = ""
# while name == "" or name in infos['unique_names'] :
# name = random_str_with_max_len(config['test']['elt_name_max_len'])
# infos['unique_names'].append(name)
# return name
#
#
# def print_test(config, sentence):
# if config['test']['verbose'] :
# print(sentence)
#
#
# def test_set_and_get(config, infos):
# print_test(config, ">>> Set and get test")
# col_name = random_column(infos)
# col = infos['view'][col_name]
# element_names = col.elements_names
# data_type = col.data_type
# if data_type == "OBI_QUAL" :
# print_test(config, "-")
# return
# idx = random_int(config)
# value = infos['random_generator'][data_type](config)
# if col.nb_elements_per_line > 1 :
# elt = random.choice(element_names)
# col[idx][elt] = value
# assert col[idx][elt] == value, "Set value != gotten value "+str(col[idx][elt])+" != "+str(value)
# else:
# col[idx] = value
# assert col[idx] == value, "Set value != gotten value "+str(col[idx])+" != "+str(value)
#
# print_test(config, ">>> Set and get test OK")
#
#
# def test_add_col(config, infos):
# print_test(config, ">>> Add column test")
# #existing_col = random_bool(config) # TODO doesn't work because of line count problem. See obiview.c line 1737
# #if existing_col and infos["view_names"] != [] :
# # random_view = infos['dms'].open_view(random.choice(infos["view_names"]))
# # random_column = random_view[random.choice(sorted(list(random_view.columns))]
# # random_column_refs = random_column.refs
# # if random_column_refs['name'] in infos['view'] :
# # alias = random_unique_name(infos)
# # else :
# # alias = ''
# # infos['view'].add_column(random_column_refs['name'], version_number=random_column_refs['version'], alias=alias, create=False)
# # random_view.close()
# #else :
# create_random_column(config, infos)
# print_test(config, ">>> Add column test OK")
#
#
# def test_delete_col(config, infos):
# print_test(config, ">>> Delete column test")
# if len(list(infos['view'].columns)) <= 1 :
# print_test(config, "-")
# return
# col_name = random_column(infos)
# if col_name in SPECIAL_COLUMNS :
# print_test(config, "-")
# return
# infos['view'].delete_column(col_name)
# print_test(config, ">>> Delete column test OK")
#
#
# def test_col_alias(config, infos):
# print_test(config, ">>> Changing column alias test")
# col_name = random_column(infos)
# if col_name in SPECIAL_COLUMNS :
# print_test(config, "-")
# return
# infos['view'][col_name].alias = random_unique_name(infos)
# print_test(config, ">>> Changing column alias test OK")
#
#
# def test_new_view(config, infos):
# print_test(config, ">>> New view test")
# random_new_view(config, infos)
# print_test(config, ">>> New view test OK")
#
#
# def random_test(config, infos):
# return random.choice(infos['tests'])(config, infos)
#
#
# def random_view_type():
# return random.choice(VIEW_TYPES)
#
#
# def random_col_type():
# return random.choice(COL_TYPES)
#
#
# def fill_column(config, infos, col) :
# data_type = col.data_type
# element_names = col.elements_names
#
# if len(element_names) > 1 :
# for i in range(random_int(config)) :
# for j in range(len(element_names)) :
# col[i][element_names[j]] = infos['random_generator'][data_type](config)
# else :
# for i in range(random_int(config)) :
# col[i] = infos['random_generator'][data_type](config)
#
#
# def create_random_column(config, infos) :
# alias = random.choice(['', random_unique_name(infos)])
# nb_elements_per_line=random.randint(1, config['test']['maxelts'])
# elements_names = []
# for i in range(nb_elements_per_line) :
# elements_names.append(random_unique_element_name(config, infos))
# elements_names = random.choice([None, elements_names])
# name = random_unique_name(infos)
# infos['view'].add_column(name,
# alias=alias,
# type=random_col_type(),
# nb_elements_per_line=nb_elements_per_line,
# elements_names=elements_names,
# indexer_name=random.choice(['', random_unique_name(infos)]),
# comments=random_str_with_max_len(COL_COMMENTS_MAX_LEN),
# create=True
# )
# if alias != '' :
# return infos['view'][alias]
# else :
# return infos['view'][name]
#
#
# def fill_view(config, infos):
# for i in range(random.randint(1, config['test']['maxinicolcount'])) :
# col = create_random_column(config, infos)
# fill_column(config, infos, col)
#
#
# def random_new_view(config, infos, first=False):
# v_to_clone = None
# line_selection = None
# quality_col = False # TODO
# if not first:
# infos['view_names'].append(infos['view'].name)
# infos['view'].close()
# v_to_clone = infos['dms'].open_view(random.choice(infos["view_names"]))
# v_type = ""
# print_test(config, "View to clone: ")
# print_test(config, repr(v_to_clone))
# create_line_selection = random_bool(config)
# if create_line_selection and v_to_clone.line_count > 0:
# print_test(config, "New view with new line selection.")
# line_selection = OBIView_line_selection(v_to_clone)
# for i in range(random.randint(1, v_to_clone.line_count)) :
# line_selection.append(random.randint(0, v_to_clone.line_count-1))
# #print_test(config, "New line selection: "+str(line_selection))
# else :
# v_type = random_view_type()
#
# if line_selection is not None :
# infos['view'] = infos['dms'].clone_view_with_line_selection(random_unique_name(infos), line_selection, comments=random_str_with_max_len(config['test']['commentsmaxlen']))
# elif v_to_clone is not None :
# infos['view'] = infos['dms'].clone_view(random_unique_name(infos), v_to_clone, comments=random_str_with_max_len(config['test']['commentsmaxlen']))
# else :
# infos['view'] = infos['dms'].new_view(random_unique_name(infos), view_type=v_type, comments=random_str_with_max_len(config['test']['commentsmaxlen']), quality_column=quality_col)
#
# print_test(config, repr(infos['view']))
# if v_to_clone is not None :
# if line_selection is None:
# assert v_to_clone.line_count == infos['view'].line_count, "New view and cloned view don't have the same line count"
# else :
# assert len(line_selection) == infos['view'].line_count, "New view with new line selection does not have the right line count"
# v_to_clone.close()
# if first :
# fill_view(config, infos)
#
#
# def create_test_obidms(config, infos):
# infos['dms'] = OBIDMS(config['obi']['defaultdms'])
#
#
# def ini_dms_and_first_view(config, infos):
# create_test_obidms(config, infos)
# random_new_view(config, infos, first=True)
# infos['view_names'] = []
#
#
# def addOptions(parser):
#
# # TODO put this common group somewhere else but I don't know where
# group=parser.add_argument_group('DMS and view options')
#
# group.add_argument('--default-dms','-d',
# action="store", dest="obi:defaultdms",
# metavar='<DMS NAME>',
# default="/tmp/test_dms",
# type=str,
# help="Name of the default DMS for reading and writing data. "
# "Default: /tmp/test_dms")
#
# group=parser.add_argument_group('obi test specific options')
#
# group.add_argument('--nb_tests','-n',
# action="store", dest="test:nbtests",
# metavar='<NB_TESTS>',
# default=1000,
# type=int,
# help="Number of tests to carry out. "
# "Default: 1000")
#
# group.add_argument('--seq_max_len','-s',
# action="store", dest="test:seqmaxlen",
# metavar='<SEQ_MAX_LEN>',
# default=200,
# type=int,
# help="Maximum length of DNA sequences. "
# "Default: 200")
#
# group.add_argument('--str_max_len','-t',
# action="store", dest="test:strmaxlen",
# metavar='<STR_MAX_LEN>',
# default=200,
# type=int,
# help="Maximum length of character strings. "
# "Default: 200")
#
# group.add_argument('--comments_max_len','-c',
# action="store", dest="test:commentsmaxlen",
# metavar='<COMMENTS_MAX_LEN>',
# default=10000,
# type=int,
# help="Maximum length of view comments. "
# "Default: 10000")
#
# group.add_argument('--max_ini_col_count','-o',
# action="store", dest="test:maxinicolcount",
# metavar='<MAX_INI_COL_COUNT>',
# default=10,
# type=int,
# help="Maximum number of columns in the initial view. "
# "Default: 10")
#
# group.add_argument('--max_line_nb','-l',
# action="store", dest="test:maxlinenb",
# metavar='<MAX_LINE_NB>',
# default=10000,
# type=int,
# help="Maximum number of lines in a column. "
# "Default: 10000")
#
# group.add_argument('--max_elts_per_line','-e',
# action="store", dest="test:maxelts",
# metavar='<MAX_ELTS_PER_LINE>',
# default=20,
# type=int,
# help="Maximum number of elements per line in a column. "
# "Default: 20")
#
# group.add_argument('--verbose','-v',
# action="store_true", dest="test:verbose",
# default=False,
# help="Print the tests. "
# "Default: Don't print the tests")
#
# group.add_argument('--seed','-g',
# action="store", dest="test:seed",
# metavar='<SEED>',
# default=None,
# help="Seed (use for reproducible tests). "
# "Default: Seed is determined by Python")
#
# def run(config):
#
# if 'seed' in config['test'] :
# random.seed(config['test']['seed'])
#
# infos = {'dms': None,
# 'view': None,
# 'view_names': None,
# 'unique_names': [],
# 'random_generator': {"OBI_BOOL": random_bool, "OBI_CHAR": random_char, "OBI_FLOAT": random_float, "OBI_INT": random_int, "OBI_SEQ": random_seq, "OBI_STR": random_str},
# 'tests': [test_set_and_get, test_add_col, test_delete_col, test_col_alias, test_new_view]
# }
#
# config['test']['elt_name_max_len'] = int((COL_COMMENTS_MAX_LEN - config['test']['maxelts']) / config['test']['maxelts'])
#
# print("Initializing the DMS and the first view...")
#
# shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True)
#
# ini_dms_and_first_view(config, infos)
# print_test(config, repr(infos['view']))
#
# i = 0
# for t in range(config['test']['nbtests']):
# random_test(config, infos)
# print_test(config, repr(infos['view']))
# i+=1
# if (i%(config['test']['nbtests']/10)) == 0 :
# print("Testing......"+str(i*100/config['test']['nbtests'])+"%")
#
# #print(infos)
#
# infos['view'].close()
# infos['dms'].close()
# shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True)
#
# print("Done.")

View File

@ -1,96 +0,0 @@
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
from obitools3.obidms._obidms import OBIDMS, OBIView, OBIView_line_selection # TODO cimport doesn't work
from functools import reduce
import time
__title__="Grep view lines that match the given predicates"
default_config = { 'inputview' : None,
'outputview' : None
}
def addOptions(parser):
# TODO put this common group somewhere else but I don't know where
group=parser.add_argument_group('DMS and view options')
group.add_argument('--default-dms','-d',
action="store", dest="obi:defaultdms",
metavar='<DMS NAME>',
default=None,
type=str,
help="Name of the default DMS for reading and writing data.")
group.add_argument('--input-view','-i',
action="store", dest="obi:inputview",
metavar='<INPUT VIEW NAME>',
default=None,
type=str,
help="Name of the input view, either raw if the view is in the default DMS,"
" or in the form 'dms:view' if it is in another DMS.")
group.add_argument('--output-view','-o',
action="store", dest="obi:outputview",
metavar='<OUTPUT VIEW NAME>',
default=None,
type=str,
help="Name of the output view, either raw if the view is in the default DMS,"
" or in the form 'dms:view' if it is in another DMS.")
group=parser.add_argument_group('obi grep specific options')
group.add_argument('--predicate','-p',
action="append", dest="grep:predicates",
metavar='<PREDICATE>',
default=None,
type=str,
help="Grep lines that match the given python expression on <line> or <sequence>.")
def run(config):
# Open DMS
d = OBIDMS(config['obi']['defaultdms'])
# Open input view 1
iview = d.open_view(config['obi']['inputview'])
# Initialize the progress bar
pb = ProgressBar(len(iview), config, seconde=5)
# Apply filter
selection = OBIView_line_selection(iview)
for i in range(len(iview)) :
pb(i)
line = iview[i]
loc_env = {'sequence': line, 'line': line} # TODO add taxonomy
good = (reduce(lambda bint x, bint y: x and y,
(bool(eval(p, loc_env, line))
for p in config['grep']['predicates']), True))
if good :
selection.append(i)
# Create output view with the line selection
oview = d.new_view(config['obi']['outputview'], line_selection=selection, comments="obi grep: "+str(config['grep']['predicates'])+"\n")
#print("\n")
#print(repr(oview))
iview.close()
oview.close()
d.close()