diff --git a/python/obitools3/commands/test.pyx b/python/obitools3/commands/test.pyx new file mode 100644 index 0000000..9f3cfce --- /dev/null +++ b/python/obitools3/commands/test.pyx @@ -0,0 +1,349 @@ +from obitools3.apps.progress cimport ProgressBar # TODO I absolutely don't understand why it doesn't work without that line +from obitools3.obidms._obidms import OBIDMS # TODO cimport doesn't work +from obitools3.utils cimport str2bytes + +import os +import sys +import shutil +import unittest +import random +import string +import psutil + + +VIEW_TYPES = [None, "NUC_SEQS_VIEW"] +COL_TYPES = ["OBI_BOOL", "OBI_CHAR", "OBI_FLOAT", "OBI_INT", "OBI_SEQ", "OBI_STR"] +NUC_SEQUENCE_COLUMN = "NUC_SEQ" +ID_COLUMN = "ID" +DEFINITION_COLUMN = "DEFINITION" +QUALITY_COLUMN = "QUALITY" +SPECIAL_COLUMNS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN] + + +NAME_MAX_LEN = 50 +COL_COMMENTS_MAX_LEN = 2048 +MAX_NB_ELEMENTS_PER_LINE = 20 +ELEMENT_NAME_MAX_LEN = int(2048 / MAX_NB_ELEMENTS_PER_LINE) +MAX_INT = 2147483647 # used to generate random float values + + +__title__="Tests if the obitools are working properly" + + +default_config = { + } + + +def random_length(max_len): + return random.randint(1, max_len) + + +def random_bool(config): + return random.choice([True, False]) + + +def random_char(config): + return random.choice(string.ascii_lowercase) + + +def random_float(config): + return random.randint(0, MAX_INT) + random.random() + + +def random_int(config): + return random.randint(0, config['test']['maxlinenb']) + + +def random_seq(config): + return str2bytes(''.join(random.choice(['a','t','g','c']) for i in range(random_length(config['test']['seqmaxlen'])))) + + +def random_str(config): + return random_str_with_max_len(config['test']['strmaxlen']) + + +def random_str_with_max_len(max_len): + return ''.join(random.choice(string.ascii_lowercase) for i in range(random_length(max_len))) + + +def random_column(infos): + return random.choice(list(infos['view'].get_columns())) + + +def random_unique_name(infos): + name = "" + while name == "" or name in infos['unique_names'] : + name = random_str_with_max_len(NAME_MAX_LEN) + infos['unique_names'].append(name) + return name + + +def print_test(config, sentence): + if config['test']['verbose'] : + print(sentence) + + +def test_set_and_get(config, infos): + print_test(config, ">>> Set and get test") + col_name = random_column(infos) + col = infos['view'][col_name] + element_names = col.get_elements_names() + data_type = col.get_data_type() + if data_type == "OBI_QUAL" : + print_test(config, "-") + return + idx = random_int(config) + value = infos['random_generator'][data_type](config) + + if len(element_names) > 1 : + elt = random.choice(element_names) + col[idx][elt] = value + assert col[idx][elt] == value, "Set value != gotten value" + else: + col[idx] = value + assert col[idx] == value, "Set value != gotten value" + + print_test(config, ">>> Set and get test OK") + + +def test_add_col(config, infos): + print_test(config, ">>> Add column test") +# new_col = random_bool(config) +# if new_col : TODO test adding existing column + create_random_column(config, infos) +# else : +# alias = random.choice(['', random_unique_name(infos)]) +# infos['view'].add_column(column_name, version_number=-1, alias=alias, create=False) + print_test(config, ">>> Add column test OK") + + +def test_delete_col(config, infos): + print_test(config, ">>> Delete column test") + if len(list(infos['view'].get_columns())) <= 1 : + print_test(config, "-") + return + col_name = random_column(infos) + if col_name in SPECIAL_COLUMNS : + print_test(config, "-") + return + infos['view'].delete_column(col_name) + print_test(config, ">>> Delete column test OK") + + +def test_col_alias(config, infos): + print_test(config, ">>> Changing column alias test") + col_name = random_column(infos) + if col_name in SPECIAL_COLUMNS : + print_test(config, "-") + return + infos['view'].change_column_alias(col_name, random_unique_name(infos)) + print_test(config, ">>> Changing column alias test OK") + + +def test_line_selection(config, infos): # TODO + print_test(config, ">>> Selecting line test") + infos['view'].select_line(random.randint(0,infos['view'].get_line_count())) + print_test(config, ">>> Selecting line test OK") + + +def test_new_view(config, infos): + print_test(config, ">>> New view test") + random_new_view(config, infos) + print_test(config, ">>> New view test OK") + + +def random_test(config, infos): + return random.choice(infos['tests'])(config, infos) + + +def random_view_type(): + return random.choice(VIEW_TYPES) + + +def random_col_type(): + return random.choice(COL_TYPES) + + +def fill_column(config, infos, col) : + data_type = col.get_data_type() + element_names = col.get_elements_names() + + if len(element_names) > 1 : + for i in range(random_int(config)) : + for j in range(len(element_names)) : + col[i][element_names[j]] = infos['random_generator'][data_type](config) + else : + for i in range(random_int(config)) : + col[i] = infos['random_generator'][data_type](config) + + +def create_random_column(config, infos) : + alias = random.choice(['', random_unique_name(infos)]) + nb_elements_per_line=random.randint(1, MAX_NB_ELEMENTS_PER_LINE) + elements_names = [] + for i in range(nb_elements_per_line) : + elements_names.append(random_unique_name(infos)) + name = random_unique_name(infos) + infos['view'].add_column(name, + alias=alias, + type=random_col_type(), + nb_elements_per_line=nb_elements_per_line, + elements_names=elements_names, + indexer_name=random.choice(['', random_unique_name(infos)]), + comments=random_str_with_max_len(COL_COMMENTS_MAX_LEN), + create=True + ) + if alias != '' : + return infos['view'][alias] + else : + return infos['view'][name] + + +def fill_view(config, infos): + for i in range(random.randint(1, config['test']['maxinicolcount'])) : + col = create_random_column(config, infos) + fill_column(config, infos, col) + + +def random_new_view(config, infos, first=False): + v_to_clone = None + line_selection = None + clone = False + quality_col = False # TODO + if not first: + infos['view'].save_and_close() + v_to_clone = infos['dms'].open_view(random.choice(infos["view_names"])) + v_type = None + create_line_selection = random_bool(config) + if create_line_selection : + line_selection = [] + for i in range(random.randint(1, v_to_clone.get_line_count())) : + line_selection.append(random.randint(0, v_to_clone.get_line_count()-1)) + else : + v_type = random_view_type() + infos['view'] = infos['dms'].new_view(random_unique_name(infos), + view_to_clone=v_to_clone, + line_selection=line_selection, + view_type=v_type, + comments=random_str_with_max_len(config['test']['commentsmaxlen']), + quality_column=quality_col) + if v_to_clone is not None : + v_to_clone.save_and_close() + if first : + fill_view(config, infos) + + +def create_test_obidms(config, infos): + infos['dms'] = OBIDMS(config['obi']['defaultdms']) + + +def ini_dms_and_first_view(config, infos): + create_test_obidms(config, infos) + random_new_view(config, infos, first=True) + infos['view_names'] = [] + infos['view_names'].append(infos['view'].get_name()) + + +def addOptions(parser): + + # TODO put this common group somewhere else but I don't know where + group=parser.add_argument_group('DMS and view options') + + group.add_argument('--default-dms','-d', + action="store", dest="obi:defaultdms", + metavar='', + default="/tmp/test_dms", + type=str, + help="Name of the default DMS for reading and writing data. " + "Default: /tmp/test_dms") + + group=parser.add_argument_group('obi test specific options') + + group.add_argument('--nb_tests','-n', + action="store", dest="test:nbtests", + metavar='', + default=1000, + type=int, + help="Number of tests to carry out. " + "Default: 1000") + + group.add_argument('--seq_max_len','-s', + action="store", dest="test:seqmaxlen", + metavar='', + default=200, + type=int, + help="Maximum length of DNA sequences." + "Default: 200") + + group.add_argument('--str_max_len','-t', + action="store", dest="test:strmaxlen", + metavar='', + default=200, + type=int, + help="Maximum length of character strings." + "Default: 200") + + group.add_argument('--comments_max_len','-c', + action="store", dest="test:commentsmaxlen", + metavar='', + default=10000, + type=int, + help="Maximum length of view comments." + "Default: 10000") + + group.add_argument('--max_ini_col_count','-o', + action="store", dest="test:maxinicolcount", + metavar='', + default=10, + type=int, + help="Maximum number of columns in the initial view." + "Default: 10") + + group.add_argument('--max_line_nb','-l', + action="store", dest="test:maxlinenb", + metavar='', + default=10000, + type=int, + help="Maximum number of lines in a column." + "Default: 10000") + + group.add_argument('--verbose','-v', + action="store_true", dest="test:verbose", + default=False, + help="Print the tests." + "Default: Don't print the tests") + +def run(config): + + infos = {'dms': None, + 'view': None, + 'view_names': None, + 'unique_names': [], + 'random_generator': {"OBI_BOOL": random_bool, "OBI_CHAR": random_char, "OBI_FLOAT": random_float, "OBI_INT": random_int, "OBI_SEQ": random_seq, "OBI_STR": random_str}, + 'tests': [test_set_and_get, test_add_col, test_delete_col, test_col_alias] #, test_line_selection] # TODO + } + + print("Initializing the DMS and the first view...") + + ini_dms_and_first_view(config, infos) + + i = 0 + for t in range(config['test']['nbtests']): + random_test(config, infos) + #print(repr(infos['view'])) + i+=1 + # New view sometimes but max 100 because of issues with opened files number limit (news views and random modifications = lots of AVL files) TODO test without limit + if (i%(config['test']['nbtests']/100)) == 0 : + test_new_view(config, infos) + if (i%(config['test']['nbtests']/10)) == 0 : + print("Testing......"+str(i*100/config['test']['nbtests'])+"%") + + #print(infos) + + infos['view'].save_and_close() + infos['dms'].close() + shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True) + + print("Done.") + + \ No newline at end of file