First version of the obi test command, testing that the OBITools3 work
correctly
This commit is contained in:
349
python/obitools3/commands/test.pyx
Normal file
349
python/obitools3/commands/test.pyx
Normal file
@ -0,0 +1,349 @@
|
||||
from obitools3.apps.progress cimport ProgressBar # TODO I absolutely don't understand why it doesn't work without that line
|
||||
from obitools3.obidms._obidms import OBIDMS # TODO cimport doesn't work
|
||||
from obitools3.utils cimport str2bytes
|
||||
|
||||
import os
|
||||
import sys
|
||||
import shutil
|
||||
import unittest
|
||||
import random
|
||||
import string
|
||||
import psutil
|
||||
|
||||
|
||||
VIEW_TYPES = [None, "NUC_SEQS_VIEW"]
|
||||
COL_TYPES = ["OBI_BOOL", "OBI_CHAR", "OBI_FLOAT", "OBI_INT", "OBI_SEQ", "OBI_STR"]
|
||||
NUC_SEQUENCE_COLUMN = "NUC_SEQ"
|
||||
ID_COLUMN = "ID"
|
||||
DEFINITION_COLUMN = "DEFINITION"
|
||||
QUALITY_COLUMN = "QUALITY"
|
||||
SPECIAL_COLUMNS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN]
|
||||
|
||||
|
||||
NAME_MAX_LEN = 50
|
||||
COL_COMMENTS_MAX_LEN = 2048
|
||||
MAX_NB_ELEMENTS_PER_LINE = 20
|
||||
ELEMENT_NAME_MAX_LEN = int(2048 / MAX_NB_ELEMENTS_PER_LINE)
|
||||
MAX_INT = 2147483647 # used to generate random float values
|
||||
|
||||
|
||||
__title__="Tests if the obitools are working properly"
|
||||
|
||||
|
||||
default_config = {
|
||||
}
|
||||
|
||||
|
||||
def random_length(max_len):
|
||||
return random.randint(1, max_len)
|
||||
|
||||
|
||||
def random_bool(config):
|
||||
return random.choice([True, False])
|
||||
|
||||
|
||||
def random_char(config):
|
||||
return random.choice(string.ascii_lowercase)
|
||||
|
||||
|
||||
def random_float(config):
|
||||
return random.randint(0, MAX_INT) + random.random()
|
||||
|
||||
|
||||
def random_int(config):
|
||||
return random.randint(0, config['test']['maxlinenb'])
|
||||
|
||||
|
||||
def random_seq(config):
|
||||
return str2bytes(''.join(random.choice(['a','t','g','c']) for i in range(random_length(config['test']['seqmaxlen']))))
|
||||
|
||||
|
||||
def random_str(config):
|
||||
return random_str_with_max_len(config['test']['strmaxlen'])
|
||||
|
||||
|
||||
def random_str_with_max_len(max_len):
|
||||
return ''.join(random.choice(string.ascii_lowercase) for i in range(random_length(max_len)))
|
||||
|
||||
|
||||
def random_column(infos):
|
||||
return random.choice(list(infos['view'].get_columns()))
|
||||
|
||||
|
||||
def random_unique_name(infos):
|
||||
name = ""
|
||||
while name == "" or name in infos['unique_names'] :
|
||||
name = random_str_with_max_len(NAME_MAX_LEN)
|
||||
infos['unique_names'].append(name)
|
||||
return name
|
||||
|
||||
|
||||
def print_test(config, sentence):
|
||||
if config['test']['verbose'] :
|
||||
print(sentence)
|
||||
|
||||
|
||||
def test_set_and_get(config, infos):
|
||||
print_test(config, ">>> Set and get test")
|
||||
col_name = random_column(infos)
|
||||
col = infos['view'][col_name]
|
||||
element_names = col.get_elements_names()
|
||||
data_type = col.get_data_type()
|
||||
if data_type == "OBI_QUAL" :
|
||||
print_test(config, "-")
|
||||
return
|
||||
idx = random_int(config)
|
||||
value = infos['random_generator'][data_type](config)
|
||||
|
||||
if len(element_names) > 1 :
|
||||
elt = random.choice(element_names)
|
||||
col[idx][elt] = value
|
||||
assert col[idx][elt] == value, "Set value != gotten value"
|
||||
else:
|
||||
col[idx] = value
|
||||
assert col[idx] == value, "Set value != gotten value"
|
||||
|
||||
print_test(config, ">>> Set and get test OK")
|
||||
|
||||
|
||||
def test_add_col(config, infos):
|
||||
print_test(config, ">>> Add column test")
|
||||
# new_col = random_bool(config)
|
||||
# if new_col : TODO test adding existing column
|
||||
create_random_column(config, infos)
|
||||
# else :
|
||||
# alias = random.choice(['', random_unique_name(infos)])
|
||||
# infos['view'].add_column(column_name, version_number=-1, alias=alias, create=False)
|
||||
print_test(config, ">>> Add column test OK")
|
||||
|
||||
|
||||
def test_delete_col(config, infos):
|
||||
print_test(config, ">>> Delete column test")
|
||||
if len(list(infos['view'].get_columns())) <= 1 :
|
||||
print_test(config, "-")
|
||||
return
|
||||
col_name = random_column(infos)
|
||||
if col_name in SPECIAL_COLUMNS :
|
||||
print_test(config, "-")
|
||||
return
|
||||
infos['view'].delete_column(col_name)
|
||||
print_test(config, ">>> Delete column test OK")
|
||||
|
||||
|
||||
def test_col_alias(config, infos):
|
||||
print_test(config, ">>> Changing column alias test")
|
||||
col_name = random_column(infos)
|
||||
if col_name in SPECIAL_COLUMNS :
|
||||
print_test(config, "-")
|
||||
return
|
||||
infos['view'].change_column_alias(col_name, random_unique_name(infos))
|
||||
print_test(config, ">>> Changing column alias test OK")
|
||||
|
||||
|
||||
def test_line_selection(config, infos): # TODO
|
||||
print_test(config, ">>> Selecting line test")
|
||||
infos['view'].select_line(random.randint(0,infos['view'].get_line_count()))
|
||||
print_test(config, ">>> Selecting line test OK")
|
||||
|
||||
|
||||
def test_new_view(config, infos):
|
||||
print_test(config, ">>> New view test")
|
||||
random_new_view(config, infos)
|
||||
print_test(config, ">>> New view test OK")
|
||||
|
||||
|
||||
def random_test(config, infos):
|
||||
return random.choice(infos['tests'])(config, infos)
|
||||
|
||||
|
||||
def random_view_type():
|
||||
return random.choice(VIEW_TYPES)
|
||||
|
||||
|
||||
def random_col_type():
|
||||
return random.choice(COL_TYPES)
|
||||
|
||||
|
||||
def fill_column(config, infos, col) :
|
||||
data_type = col.get_data_type()
|
||||
element_names = col.get_elements_names()
|
||||
|
||||
if len(element_names) > 1 :
|
||||
for i in range(random_int(config)) :
|
||||
for j in range(len(element_names)) :
|
||||
col[i][element_names[j]] = infos['random_generator'][data_type](config)
|
||||
else :
|
||||
for i in range(random_int(config)) :
|
||||
col[i] = infos['random_generator'][data_type](config)
|
||||
|
||||
|
||||
def create_random_column(config, infos) :
|
||||
alias = random.choice(['', random_unique_name(infos)])
|
||||
nb_elements_per_line=random.randint(1, MAX_NB_ELEMENTS_PER_LINE)
|
||||
elements_names = []
|
||||
for i in range(nb_elements_per_line) :
|
||||
elements_names.append(random_unique_name(infos))
|
||||
name = random_unique_name(infos)
|
||||
infos['view'].add_column(name,
|
||||
alias=alias,
|
||||
type=random_col_type(),
|
||||
nb_elements_per_line=nb_elements_per_line,
|
||||
elements_names=elements_names,
|
||||
indexer_name=random.choice(['', random_unique_name(infos)]),
|
||||
comments=random_str_with_max_len(COL_COMMENTS_MAX_LEN),
|
||||
create=True
|
||||
)
|
||||
if alias != '' :
|
||||
return infos['view'][alias]
|
||||
else :
|
||||
return infos['view'][name]
|
||||
|
||||
|
||||
def fill_view(config, infos):
|
||||
for i in range(random.randint(1, config['test']['maxinicolcount'])) :
|
||||
col = create_random_column(config, infos)
|
||||
fill_column(config, infos, col)
|
||||
|
||||
|
||||
def random_new_view(config, infos, first=False):
|
||||
v_to_clone = None
|
||||
line_selection = None
|
||||
clone = False
|
||||
quality_col = False # TODO
|
||||
if not first:
|
||||
infos['view'].save_and_close()
|
||||
v_to_clone = infos['dms'].open_view(random.choice(infos["view_names"]))
|
||||
v_type = None
|
||||
create_line_selection = random_bool(config)
|
||||
if create_line_selection :
|
||||
line_selection = []
|
||||
for i in range(random.randint(1, v_to_clone.get_line_count())) :
|
||||
line_selection.append(random.randint(0, v_to_clone.get_line_count()-1))
|
||||
else :
|
||||
v_type = random_view_type()
|
||||
infos['view'] = infos['dms'].new_view(random_unique_name(infos),
|
||||
view_to_clone=v_to_clone,
|
||||
line_selection=line_selection,
|
||||
view_type=v_type,
|
||||
comments=random_str_with_max_len(config['test']['commentsmaxlen']),
|
||||
quality_column=quality_col)
|
||||
if v_to_clone is not None :
|
||||
v_to_clone.save_and_close()
|
||||
if first :
|
||||
fill_view(config, infos)
|
||||
|
||||
|
||||
def create_test_obidms(config, infos):
|
||||
infos['dms'] = OBIDMS(config['obi']['defaultdms'])
|
||||
|
||||
|
||||
def ini_dms_and_first_view(config, infos):
|
||||
create_test_obidms(config, infos)
|
||||
random_new_view(config, infos, first=True)
|
||||
infos['view_names'] = []
|
||||
infos['view_names'].append(infos['view'].get_name())
|
||||
|
||||
|
||||
def addOptions(parser):
|
||||
|
||||
# TODO put this common group somewhere else but I don't know where
|
||||
group=parser.add_argument_group('DMS and view options')
|
||||
|
||||
group.add_argument('--default-dms','-d',
|
||||
action="store", dest="obi:defaultdms",
|
||||
metavar='<DMS NAME>',
|
||||
default="/tmp/test_dms",
|
||||
type=str,
|
||||
help="Name of the default DMS for reading and writing data. "
|
||||
"Default: /tmp/test_dms")
|
||||
|
||||
group=parser.add_argument_group('obi test specific options')
|
||||
|
||||
group.add_argument('--nb_tests','-n',
|
||||
action="store", dest="test:nbtests",
|
||||
metavar='<NB_TESTS>',
|
||||
default=1000,
|
||||
type=int,
|
||||
help="Number of tests to carry out. "
|
||||
"Default: 1000")
|
||||
|
||||
group.add_argument('--seq_max_len','-s',
|
||||
action="store", dest="test:seqmaxlen",
|
||||
metavar='<SEQ_MAX_LEN>',
|
||||
default=200,
|
||||
type=int,
|
||||
help="Maximum length of DNA sequences."
|
||||
"Default: 200")
|
||||
|
||||
group.add_argument('--str_max_len','-t',
|
||||
action="store", dest="test:strmaxlen",
|
||||
metavar='<STR_MAX_LEN>',
|
||||
default=200,
|
||||
type=int,
|
||||
help="Maximum length of character strings."
|
||||
"Default: 200")
|
||||
|
||||
group.add_argument('--comments_max_len','-c',
|
||||
action="store", dest="test:commentsmaxlen",
|
||||
metavar='<COMMENTS_MAX_LEN>',
|
||||
default=10000,
|
||||
type=int,
|
||||
help="Maximum length of view comments."
|
||||
"Default: 10000")
|
||||
|
||||
group.add_argument('--max_ini_col_count','-o',
|
||||
action="store", dest="test:maxinicolcount",
|
||||
metavar='<MAX_INI_COL_COUNT>',
|
||||
default=10,
|
||||
type=int,
|
||||
help="Maximum number of columns in the initial view."
|
||||
"Default: 10")
|
||||
|
||||
group.add_argument('--max_line_nb','-l',
|
||||
action="store", dest="test:maxlinenb",
|
||||
metavar='<MAX_LINE_NB>',
|
||||
default=10000,
|
||||
type=int,
|
||||
help="Maximum number of lines in a column."
|
||||
"Default: 10000")
|
||||
|
||||
group.add_argument('--verbose','-v',
|
||||
action="store_true", dest="test:verbose",
|
||||
default=False,
|
||||
help="Print the tests."
|
||||
"Default: Don't print the tests")
|
||||
|
||||
def run(config):
|
||||
|
||||
infos = {'dms': None,
|
||||
'view': None,
|
||||
'view_names': None,
|
||||
'unique_names': [],
|
||||
'random_generator': {"OBI_BOOL": random_bool, "OBI_CHAR": random_char, "OBI_FLOAT": random_float, "OBI_INT": random_int, "OBI_SEQ": random_seq, "OBI_STR": random_str},
|
||||
'tests': [test_set_and_get, test_add_col, test_delete_col, test_col_alias] #, test_line_selection] # TODO
|
||||
}
|
||||
|
||||
print("Initializing the DMS and the first view...")
|
||||
|
||||
ini_dms_and_first_view(config, infos)
|
||||
|
||||
i = 0
|
||||
for t in range(config['test']['nbtests']):
|
||||
random_test(config, infos)
|
||||
#print(repr(infos['view']))
|
||||
i+=1
|
||||
# New view sometimes but max 100 because of issues with opened files number limit (news views and random modifications = lots of AVL files) TODO test without limit
|
||||
if (i%(config['test']['nbtests']/100)) == 0 :
|
||||
test_new_view(config, infos)
|
||||
if (i%(config['test']['nbtests']/10)) == 0 :
|
||||
print("Testing......"+str(i*100/config['test']['nbtests'])+"%")
|
||||
|
||||
#print(infos)
|
||||
|
||||
infos['view'].save_and_close()
|
||||
infos['dms'].close()
|
||||
shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True)
|
||||
|
||||
print("Done.")
|
||||
|
||||
|
Reference in New Issue
Block a user