2017-07-05 14:37:27 +02:00
#cython: language_level=3
2016-09-15 12:26:07 +02:00
from obitools3.apps.progress cimport ProgressBar # TODO I absolutely don't understand why it doesn't work without that line
2017-11-15 13:48:59 +01:00
from obitools3.dms.view import View , Line_selection
2017-04-21 12:09:04 +02:00
from obitools3.dms.view.typed_view.view_NUC_SEQS import View_NUC_SEQS
2017-11-15 13:48:59 +01:00
from obitools3.dms import DMS
2017-04-14 16:21:33 +02:00
from obitools3.dms.column import Column
2017-11-15 13:48:59 +01:00
from obitools3.dms.taxo import Taxonomy
2017-04-14 16:21:33 +02:00
from obitools3.utils cimport str2bytes
2017-07-05 14:37:27 +02:00
from obitools3.dms.capi.obitypes cimport OBI_INT , \
OBI_FLOAT , \
OBI_BOOL , \
OBI_CHAR , \
OBI_STR , \
OBI_SEQ
2018-05-17 14:54:45 +02:00
from obitools3.dms.capi.obiview cimport NUC_SEQUENCE_COLUMN , \
ID_COLUMN , \
DEFINITION_COLUMN , \
QUALITY_COLUMN , \
COUNT_COLUMN
2017-04-14 16:21:33 +02:00
import shutil
import string
import random
2020-07-27 17:38:45 +02:00
import sys
2019-09-21 16:47:22 +02:00
from cpython.exc cimport PyErr_CheckSignals
2017-07-05 14:37:27 +02:00
2017-07-28 13:15:13 +02:00
VIEW_TYPES = [ b " " , b " NUC_SEQS_VIEW " ]
2017-07-05 14:37:27 +02:00
COL_TYPES = [ OBI_INT , OBI_FLOAT , OBI_BOOL , OBI_CHAR , OBI_STR , OBI_SEQ ]
2017-04-14 16:21:33 +02:00
SPECIAL_COLUMNS = [ NUC_SEQUENCE_COLUMN , ID_COLUMN , DEFINITION_COLUMN , QUALITY_COLUMN ]
2017-04-21 12:09:04 +02:00
#TAXDUMP = "" TODO path=?
2017-07-28 13:15:13 +02:00
TAXTEST = b " taxtest "
2017-04-14 16:21:33 +02:00
NAME_MAX_LEN = 200
COL_COMMENTS_MAX_LEN = 2048
MAX_INT = 2147483647 # used to generate random float values
2021-03-29 13:07:48 +13:00
__title__ = " Test if the obitools are working properly "
2017-04-21 12:09:04 +02:00
2017-07-05 14:37:27 +02:00
default_config = {
2017-04-14 16:21:33 +02:00
}
2017-04-21 12:09:04 +02:00
def test_taxo ( config , infos ) :
2017-10-04 15:50:32 +02:00
tax1 = Taxonomy . open_taxdump ( infos [ ' dms ' ] , config [ ' obi ' ] [ ' taxo ' ] )
2017-04-21 12:09:04 +02:00
tax1 . write ( TAXTEST )
2017-10-04 15:50:32 +02:00
tax2 = Taxonomy . open ( infos [ ' dms ' ] , TAXTEST )
2017-04-21 12:09:04 +02:00
assert len ( tax1 ) == len ( tax2 ) , " Length of written taxonomy != length of read taxdump : " + str ( len ( tax2 ) ) + " != " + str ( len ( tax1 ) )
i = 0
for x in range ( config [ ' test ' ] [ ' nbtests ' ] ) :
idx = random . randint ( 0 , len ( tax1 ) - 1 )
t1 = tax1 . get_taxon_by_idx ( idx )
2017-11-24 17:54:10 +01:00
taxid1 = t1 . taxid
2017-04-21 12:09:04 +02:00
t2 = tax2 . get_taxon_by_idx ( idx )
2017-11-24 17:54:10 +01:00
taxid2 = t2 . taxid
assert t1 == t2 , " Taxon gotten from written taxonomy with index != taxon read from taxdump : " + str ( t2 ) + " != " + str ( t1 )
t1 = tax1 [ taxid1 ]
t2 = tax2 [ taxid2 ]
assert t1 == t2 , " Taxon gotten from written taxonomy with taxid != taxon read from taxdump : " + str ( t2 ) + " != " + str ( t1 )
2017-04-21 12:09:04 +02:00
i + = 1
if ( i % ( config [ ' test ' ] [ ' nbtests ' ] / 10 ) ) == 0 :
print ( " Testing taxonomy functions...... " + str ( i * 100 / config [ ' test ' ] [ ' nbtests ' ] ) + " % " )
tax1 . close ( )
tax2 . close ( )
2017-04-14 16:21:33 +02:00
def random_length ( max_len ) :
return random . randint ( 1 , max_len )
2017-04-21 12:09:04 +02:00
2017-04-14 16:21:33 +02:00
def random_bool ( config ) :
return random . choice ( [ True , False ] )
2017-04-21 12:09:04 +02:00
2017-11-15 13:48:59 +01:00
def random_bool_tuples ( config ) :
l = [ ]
for i in range ( random . randint ( 1 , config [ ' test ' ] [ ' tuplemaxlen ' ] ) ) :
l . append ( random . choice ( [ None , random_bool ( config ) ] ) )
return tuple ( l )
2017-04-14 16:21:33 +02:00
def random_char ( config ) :
return str2bytes ( random . choice ( string . ascii_lowercase ) )
2017-04-21 12:09:04 +02:00
2017-11-15 13:48:59 +01:00
def random_char_tuples ( config ) :
l = [ ]
for i in range ( random . randint ( 1 , config [ ' test ' ] [ ' tuplemaxlen ' ] ) ) :
l . append ( random . choice ( [ None , random_char ( config ) ] ) )
return tuple ( l )
2017-04-14 16:21:33 +02:00
def random_float ( config ) :
return random . randint ( 0 , MAX_INT ) + random . random ( )
2017-04-21 12:09:04 +02:00
2017-11-15 13:48:59 +01:00
def random_float_tuples ( config ) :
l = [ ]
for i in range ( random . randint ( 1 , config [ ' test ' ] [ ' tuplemaxlen ' ] ) ) :
l . append ( random . choice ( [ None , random_float ( config ) ] ) )
return tuple ( l )
2017-04-14 16:21:33 +02:00
def random_int ( config ) :
return random . randint ( 0 , config [ ' test ' ] [ ' maxlinenb ' ] )
2017-04-21 12:09:04 +02:00
2017-11-15 13:48:59 +01:00
def random_int_tuples ( config ) :
l = [ ]
for i in range ( random . randint ( 1 , config [ ' test ' ] [ ' tuplemaxlen ' ] ) ) :
l . append ( random . choice ( [ None , random_int ( config ) ] ) )
return tuple ( l )
2017-04-14 16:21:33 +02:00
def random_seq ( config ) :
return str2bytes ( ' ' . join ( random . choice ( [ ' a ' , ' t ' , ' g ' , ' c ' ] ) for i in range ( random_length ( config [ ' test ' ] [ ' seqmaxlen ' ] ) ) ) )
2017-04-21 12:09:04 +02:00
2017-11-15 13:48:59 +01:00
def random_seq_tuples ( config ) :
l = [ ]
for i in range ( random . randint ( 1 , config [ ' test ' ] [ ' tuplemaxlen ' ] ) ) :
l . append ( random . choice ( [ None , random_seq ( config ) ] ) )
return tuple ( l )
2017-04-14 16:21:33 +02:00
def random_bytes ( config ) :
return random_bytes_with_max_len ( config [ ' test ' ] [ ' strmaxlen ' ] )
2017-04-21 12:09:04 +02:00
2017-11-15 13:48:59 +01:00
def random_bytes_tuples ( config ) :
l = [ ]
for i in range ( random . randint ( 1 , config [ ' test ' ] [ ' tuplemaxlen ' ] ) ) :
l . append ( random . choice ( [ None , random_bytes ( config ) ] ) )
return tuple ( l )
2017-04-14 16:21:33 +02:00
def random_str_with_max_len ( max_len ) :
return ' ' . join ( random . choice ( string . ascii_lowercase ) for i in range ( random_length ( max_len ) ) )
2016-09-15 12:26:07 +02:00
2017-04-14 16:21:33 +02:00
def random_bytes_with_max_len ( max_len ) :
2017-07-28 13:15:13 +02:00
return str2bytes ( random_str_with_max_len ( max_len ) )
2017-04-14 16:21:33 +02:00
2018-10-07 19:10:46 +02:00
RANDOM_FUNCTIONS = [ random_bool , random_char , random_bytes , random_float , random_int ]
def random_comments ( config ) :
comments = { }
for i in range ( random_length ( 1000 ) ) :
to_add = { random_bytes ( config ) : random . choice ( RANDOM_FUNCTIONS ) ( config ) }
if len ( str ( comments ) ) + len ( str ( to_add ) ) > = COL_COMMENTS_MAX_LEN :
return comments
else :
comments . update ( to_add )
return comments
2017-04-14 16:21:33 +02:00
def random_column ( infos ) :
return random . choice ( sorted ( list ( infos [ ' view ' ] . keys ( ) ) ) )
def random_unique_name ( infos ) :
2017-07-28 13:15:13 +02:00
name = b " "
while name == b " " or name in infos [ ' unique_names ' ] :
name = random_bytes_with_max_len ( NAME_MAX_LEN )
2017-04-14 16:21:33 +02:00
infos [ ' unique_names ' ] . append ( name )
return name
def random_unique_element_name ( config , infos ) :
2017-07-28 13:15:13 +02:00
name = b " "
while name == b " " or name in infos [ ' unique_names ' ] :
name = random_bytes_with_max_len ( config [ ' test ' ] [ ' elt_name_max_len ' ] )
2017-04-14 16:21:33 +02:00
infos [ ' unique_names ' ] . append ( name )
return name
def print_test ( config , sentence ) :
if config [ ' test ' ] [ ' verbose ' ] :
print ( sentence )
def test_set_and_get ( config , infos ) :
print_test ( config , " >>> Set and get test " )
col_name = random_column ( infos )
col = infos [ ' view ' ] [ col_name ]
element_names = col . elements_names
data_type = col . data_type
2017-07-28 13:15:13 +02:00
if data_type == b " OBI_QUAL " :
2017-04-14 16:21:33 +02:00
print_test ( config , " - " )
return
idx = random_int ( config )
2017-11-15 13:48:59 +01:00
value = random . choice ( [ None , infos [ ' random_generator ' ] [ ( data_type , col . tuples ) ] ( config ) ] )
2017-04-14 16:21:33 +02:00
if col . nb_elements_per_line > 1 :
elt = random . choice ( element_names )
col [ idx ] [ elt ] = value
assert col [ idx ] [ elt ] == value , " Column: " + repr ( col ) + " \n Set value != gotten value " + str ( value ) + " != " + str ( col [ idx ] [ elt ] )
2017-11-15 13:48:59 +01:00
elif col . tuples :
col [ idx ] = value
if value is None :
totest = None
else :
totest = [ ]
for e in value :
if e is not None and e != ' ' :
totest . append ( e )
if len ( totest ) == 0 :
totest = None
else :
totest = tuple ( totest )
assert col [ idx ] == totest , " Column: " + repr ( col ) + " \n Set value != gotten value " + str ( totest ) + " != " + str ( col [ idx ] )
if totest is not None :
for i in range ( len ( totest ) ) :
assert col [ idx ] [ i ] == totest [ i ] , " Column: " + repr ( col ) + " \n Set value[i] != gotten value[i] " + str ( totest [ i ] ) + " != " + str ( col [ idx ] [ i ] )
2017-04-14 16:21:33 +02:00
else :
col [ idx ] = value
assert col [ idx ] == value , " Column: " + repr ( col ) + " \n Set value != gotten value " + str ( value ) + " != " + str ( col [ idx ] )
print_test ( config , " >>> Set and get test OK " )
def test_add_col ( config , infos ) :
print_test ( config , " >>> Add column test " )
#existing_col = random_bool(config) # TODO doesn't work because of line count problem. See obiview.c line 1737
#if existing_col and infos["view_names"] != [] :
# random_view = infos['dms'].open_view(random.choice(infos["view_names"]))
# random_column = random_view[random.choice(sorted(list(random_view.columns))]
# random_column_refs = random_column.refs
# if random_column_refs['name'] in infos['view'] :
# alias = random_unique_name(infos)
# else :
# alias = ''
# infos['view'].add_column(random_column_refs['name'], version_number=random_column_refs['version'], alias=alias, create=False)
# random_view.close()
#else :
create_random_column ( config , infos )
print_test ( config , " >>> Add column test OK " )
def test_delete_col ( config , infos ) :
print_test ( config , " >>> Delete column test " )
if len ( list ( infos [ ' view ' ] . keys ( ) ) ) < = 1 :
print_test ( config , " - " )
return
col_name = random_column ( infos )
if col_name in SPECIAL_COLUMNS :
print_test ( config , " - " )
return
infos [ ' view ' ] . delete_column ( col_name )
print_test ( config , " >>> Delete column test OK " )
def test_col_alias ( config , infos ) :
print_test ( config , " >>> Changing column alias test " )
col_name = random_column ( infos )
if col_name in SPECIAL_COLUMNS :
print_test ( config , " - " )
return
infos [ ' view ' ] [ col_name ] . name = random_unique_name ( infos )
print_test ( config , " >>> Changing column alias test OK " )
def test_new_view ( config , infos ) :
print_test ( config , " >>> New view test " )
random_new_view ( config , infos )
print_test ( config , " >>> New view test OK " )
def random_test ( config , infos ) :
return random . choice ( infos [ ' tests ' ] ) ( config , infos )
def random_view_type ( ) :
return random . choice ( VIEW_TYPES )
def random_col_type ( ) :
return random . choice ( COL_TYPES )
def fill_column ( config , infos , col ) :
data_type = col . data_type
element_names = col . elements_names
if len ( element_names ) > 1 :
for i in range ( random_int ( config ) ) :
for j in range ( len ( element_names ) ) :
2017-11-15 13:48:59 +01:00
col [ i ] [ element_names [ j ] ] = random . choice ( [ None , infos [ ' random_generator ' ] [ ( data_type , col . tuples ) ] ( config ) ] )
2017-04-14 16:21:33 +02:00
else :
for i in range ( random_int ( config ) ) :
2017-11-15 13:48:59 +01:00
r = random . choice ( [ None , infos [ ' random_generator ' ] [ ( data_type , col . tuples ) ] ( config ) ] )
col [ i ] = r
2017-04-14 16:21:33 +02:00
def create_random_column ( config , infos ) :
2017-07-28 13:15:13 +02:00
alias = random . choice ( [ b ' ' , random_unique_name ( infos ) ] )
2017-11-15 13:48:59 +01:00
tuples = random . choice ( [ True , False ] )
2021-03-11 16:31:31 +13:00
dict_column = False
2017-11-15 13:48:59 +01:00
if not tuples :
nb_elements_per_line = random . randint ( 1 , config [ ' test ' ] [ ' maxelts ' ] )
2021-03-10 16:50:30 +13:00
if nb_elements_per_line > 1 :
dict_column = True
2017-11-15 13:48:59 +01:00
elements_names = [ ]
for i in range ( nb_elements_per_line ) :
elements_names . append ( random_unique_element_name ( config , infos ) )
elements_names = random . choice ( [ None , elements_names ] )
else :
nb_elements_per_line = 1
elements_names = None
2017-04-14 16:21:33 +02:00
name = random_unique_name ( infos )
data_type = random_col_type ( )
column = Column . new_column ( infos [ ' view ' ] ,
name ,
data_type ,
nb_elements_per_line = nb_elements_per_line ,
elements_names = elements_names ,
2021-03-10 16:50:30 +13:00
dict_column = dict_column ,
2017-11-15 13:48:59 +01:00
tuples = tuples ,
2018-10-07 19:10:46 +02:00
comments = random_comments ( config ) ,
2017-04-14 16:21:33 +02:00
alias = alias
)
2017-07-05 14:37:27 +02:00
2017-07-28 13:15:13 +02:00
if alias != b ' ' :
2017-04-14 16:21:33 +02:00
assert infos [ ' view ' ] [ alias ] == column
else :
assert infos [ ' view ' ] [ name ] == column
return column
def fill_view ( config , infos ) :
for i in range ( random . randint ( 1 , config [ ' test ' ] [ ' maxinicolcount ' ] ) ) :
col = create_random_column ( config , infos )
fill_column ( config , infos , col )
def random_new_view ( config , infos , first = False ) :
v_to_clone = None
line_selection = None
quality_col = False # TODO
if not first :
infos [ ' view_names ' ] . append ( infos [ ' view ' ] . name )
infos [ ' view ' ] . close ( )
v_to_clone = View . open ( infos [ ' dms ' ] , random . choice ( infos [ " view_names " ] ) )
2017-07-28 13:15:13 +02:00
v_type = b " "
2017-04-14 16:21:33 +02:00
print_test ( config , " View to clone: " )
print_test ( config , repr ( v_to_clone ) )
create_line_selection = random_bool ( config )
if create_line_selection and v_to_clone . line_count > 0 :
print_test ( config , " New view with new line selection. " )
line_selection = Line_selection ( v_to_clone )
for i in range ( random . randint ( 1 , v_to_clone . line_count ) ) :
line_selection . append ( random . randint ( 0 , v_to_clone . line_count - 1 ) )
#print_test(config, "New line selection: "+str(line_selection))
else :
v_type = random_view_type ( )
if line_selection is not None :
2018-10-07 19:10:46 +02:00
infos [ ' view ' ] = line_selection . materialize ( random_unique_name ( infos ) , comments = random_comments ( config ) )
2017-04-14 16:21:33 +02:00
elif v_to_clone is not None :
2018-10-07 19:10:46 +02:00
infos [ ' view ' ] = v_to_clone . clone ( random_unique_name ( infos ) , comments = random_comments ( config ) )
2017-04-14 16:21:33 +02:00
else :
2017-04-21 12:09:04 +02:00
if v_type == " NUC_SEQS_VIEW " :
2018-10-07 19:10:46 +02:00
infos [ ' view ' ] = View_NUC_SEQS . new ( infos [ ' dms ' ] , random_unique_name ( infos ) , comments = random_comments ( config ) ) # TODO quality column
2017-04-21 12:09:04 +02:00
else :
2018-10-07 19:10:46 +02:00
infos [ ' view ' ] = View . new ( infos [ ' dms ' ] , random_unique_name ( infos ) , comments = random_comments ( config ) ) # TODO quality column
2020-07-27 17:38:45 +02:00
infos [ ' view ' ] . write_config ( config , " test " , infos [ " command_line " ] , input_dms_name = [ infos [ ' dms ' ] . name ] , input_view_name = [ " random " ] )
2017-04-14 16:21:33 +02:00
print_test ( config , repr ( infos [ ' view ' ] ) )
if v_to_clone is not None :
if line_selection is None :
assert v_to_clone . line_count == infos [ ' view ' ] . line_count , " New view and cloned view don ' t have the same line count : " + str ( v_to_clone . line_count ) + " (view to clone line count) != " + str ( infos [ ' view ' ] . line_count ) + " (new view line count) "
else :
assert len ( line_selection ) == infos [ ' view ' ] . line_count , " New view with new line selection does not have the right line count : " + str ( len ( line_selection ) ) + " (line selection length) != " + str ( infos [ ' view ' ] . line_count ) + " (new view line count) "
v_to_clone . close ( )
if first :
fill_view ( config , infos )
2017-07-05 14:37:27 +02:00
2017-04-14 16:21:33 +02:00
def create_test_obidms ( config , infos ) :
infos [ ' dms ' ] = DMS . new ( config [ ' obi ' ] [ ' defaultdms ' ] )
2017-07-05 14:37:27 +02:00
2017-04-14 16:21:33 +02:00
def ini_dms_and_first_view ( config , infos ) :
create_test_obidms ( config , infos )
random_new_view ( config , infos , first = True )
infos [ ' view_names ' ] = [ ]
2017-07-05 14:37:27 +02:00
2017-04-14 16:21:33 +02:00
def addOptions ( parser ) :
# TODO put this common group somewhere else but I don't know where
group = parser . add_argument_group ( ' DMS and view options ' )
group . add_argument ( ' --default-dms ' , ' -d ' ,
action = " store " , dest = " obi:defaultdms " ,
metavar = ' <DMS NAME> ' ,
default = " /tmp/test_dms " ,
type = str ,
help = " Name of the default DMS for reading and writing data. "
" Default: /tmp/test_dms " )
2017-04-21 12:09:04 +02:00
2017-07-05 14:37:27 +02:00
group . add_argument ( ' --taxo ' , ' -t ' , # TODO I don't understand why the option is not registered if it is not set
2017-04-21 12:09:04 +02:00
action = " store " , dest = " obi:taxo " ,
metavar = ' <TAXDUMP PATH> ' ,
2017-07-05 14:37:27 +02:00
default = ' ' , # TODO not None because if it's None, the option is not entered in the option dictionary.
2017-04-21 12:09:04 +02:00
type = str ,
2017-07-05 14:37:27 +02:00
help = " Path to a taxdump to test the taxonomy. " )
2017-04-21 12:09:04 +02:00
2017-04-14 16:21:33 +02:00
group = parser . add_argument_group ( ' obi test specific options ' )
group . add_argument ( ' --nb_tests ' , ' -n ' ,
action = " store " , dest = " test:nbtests " ,
metavar = ' <NB_TESTS> ' ,
default = 1000 ,
type = int ,
help = " Number of tests to carry out. "
" Default: 1000 " )
group . add_argument ( ' --seq_max_len ' , ' -s ' ,
action = " store " , dest = " test:seqmaxlen " ,
metavar = ' <SEQ_MAX_LEN> ' ,
default = 200 ,
type = int ,
help = " Maximum length of DNA sequences. "
" Default: 200 " )
2017-04-21 12:09:04 +02:00
group . add_argument ( ' --str_max_len ' , ' -r ' ,
2017-04-14 16:21:33 +02:00
action = " store " , dest = " test:strmaxlen " ,
metavar = ' <STR_MAX_LEN> ' ,
default = 200 ,
type = int ,
help = " Maximum length of character strings. "
" Default: 200 " )
2017-11-15 13:48:59 +01:00
group . add_argument ( ' --tuple_max_len ' , ' -u ' ,
action = " store " , dest = " test:tuplemaxlen " ,
metavar = ' <TUPLE_MAX_LEN> ' ,
default = 20 ,
type = int ,
help = " Maximum length of tuples. "
2021-02-10 17:28:15 +13:00
" Default: 20 " )
2018-10-07 19:10:46 +02:00
2017-04-14 16:21:33 +02:00
group . add_argument ( ' --max_ini_col_count ' , ' -o ' ,
action = " store " , dest = " test:maxinicolcount " ,
metavar = ' <MAX_INI_COL_COUNT> ' ,
default = 10 ,
type = int ,
help = " Maximum number of columns in the initial view. "
" Default: 10 " )
group . add_argument ( ' --max_line_nb ' , ' -l ' ,
action = " store " , dest = " test:maxlinenb " ,
metavar = ' <MAX_LINE_NB> ' ,
2021-02-10 17:28:15 +13:00
default = 1000 ,
2017-04-14 16:21:33 +02:00
type = int ,
help = " Maximum number of lines in a column. "
2020-07-26 17:37:21 +02:00
" Default: 1000 " )
2017-04-14 16:21:33 +02:00
group . add_argument ( ' --max_elts_per_line ' , ' -e ' ,
action = " store " , dest = " test:maxelts " ,
metavar = ' <MAX_ELTS_PER_LINE> ' ,
default = 20 ,
type = int ,
help = " Maximum number of elements per line in a column. "
" Default: 20 " )
group . add_argument ( ' --verbose ' , ' -v ' ,
action = " store_true " , dest = " test:verbose " ,
default = False ,
help = " Print the tests. "
" Default: Don ' t print the tests " )
group . add_argument ( ' --seed ' , ' -g ' ,
action = " store " , dest = " test:seed " ,
metavar = ' <SEED> ' ,
default = None ,
help = " Seed (use for reproducible tests). "
" Default: Seed is determined by Python " )
def run ( config ) :
if ' seed ' in config [ ' test ' ] :
random . seed ( config [ ' test ' ] [ ' seed ' ] )
infos = { ' dms ' : None ,
' view ' : None ,
' view_names ' : None ,
' unique_names ' : [ ] ,
2017-11-15 13:48:59 +01:00
' random_generator ' : {
( b " OBI_BOOL " , False ) : random_bool , ( b " OBI_BOOL " , True ) : random_bool_tuples ,
( b " OBI_CHAR " , False ) : random_char , ( b " OBI_CHAR " , True ) : random_char_tuples ,
( b " OBI_FLOAT " , False ) : random_float , ( b " OBI_FLOAT " , True ) : random_float_tuples ,
( b " OBI_INT " , False ) : random_int , ( b " OBI_INT " , True ) : random_int_tuples ,
( b " OBI_SEQ " , False ) : random_seq , ( b " OBI_SEQ " , True ) : random_seq_tuples ,
( b " OBI_STR " , False ) : random_bytes , ( b " OBI_STR " , True ) : random_bytes_tuples
} ,
2020-07-27 17:38:45 +02:00
' tests ' : [ test_set_and_get , test_add_col , test_delete_col , test_col_alias , test_new_view ] ,
' command_line ' : " " . join ( sys . argv [ 1 : ] )
2017-04-14 16:21:33 +02:00
}
2018-10-07 19:10:46 +02:00
# TODO ???
2017-04-14 16:21:33 +02:00
config [ ' test ' ] [ ' elt_name_max_len ' ] = int ( ( COL_COMMENTS_MAX_LEN - config [ ' test ' ] [ ' maxelts ' ] ) / config [ ' test ' ] [ ' maxelts ' ] )
print ( " Initializing the DMS and the first view... " )
shutil . rmtree ( config [ ' obi ' ] [ ' defaultdms ' ] + ' .obidms ' , ignore_errors = True )
2017-04-21 12:09:04 +02:00
2017-04-14 16:21:33 +02:00
ini_dms_and_first_view ( config , infos )
print_test ( config , repr ( infos [ ' view ' ] ) )
2017-07-05 14:37:27 +02:00
2017-04-14 16:21:33 +02:00
i = 0
for t in range ( config [ ' test ' ] [ ' nbtests ' ] ) :
2019-09-21 16:47:22 +02:00
PyErr_CheckSignals ( )
2017-04-14 16:21:33 +02:00
random_test ( config , infos )
print_test ( config , repr ( infos [ ' view ' ] ) )
i + = 1
if ( i % ( config [ ' test ' ] [ ' nbtests ' ] / 10 ) ) == 0 :
print ( " Testing...... " + str ( i * 100 / config [ ' test ' ] [ ' nbtests ' ] ) + " % " )
2019-09-04 16:48:13 +02:00
#lsof = subprocess.Popen("lsof | grep '/private/tmp/test_dms.obidms/'", stdin=subprocess.PIPE, shell=True )
#lsof.communicate( b"LSOF\n" )
#lsof = subprocess.Popen("lsof | wc -l", stdin=subprocess.PIPE, shell=True )
#lsof.communicate( b"LSOF total\n" )
2017-04-14 16:21:33 +02:00
#print(infos)
2017-04-21 12:09:04 +02:00
2017-07-05 14:37:27 +02:00
if config [ ' obi ' ] [ ' taxo ' ] != ' ' :
test_taxo ( config , infos )
2017-04-21 12:09:04 +02:00
2017-04-14 16:21:33 +02:00
infos [ ' view ' ] . close ( )
2020-04-12 17:31:58 +02:00
infos [ ' dms ' ] . close ( force = True )
2017-04-14 16:21:33 +02:00
shutil . rmtree ( config [ ' obi ' ] [ ' defaultdms ' ] + ' .obidms ' , ignore_errors = True )
print ( " Done. " )
2017-07-05 14:37:27 +02:00