diff --git a/python/obitools3/commands/export.pyx b/python/obitools3/commands/export.pyx index fc6ba25..b3b0212 100644 --- a/python/obitools3/commands/export.pyx +++ b/python/obitools3/commands/export.pyx @@ -1,105 +1,109 @@ -from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport -from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work -from obitools3.utils cimport bytes2str +# from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport +# from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work +# from obitools3.utils cimport bytes2str +# +# import time +# import re -import time -import re - -__title__="Export a NUC_SEQS view to a fasta or fastq file" - - -default_config = { 'inputview' : None, - } - -def addOptions(parser): - - # TODO put this common group somewhere else but I don't know where - group=parser.add_argument_group('DMS and view options') - - group.add_argument('--default-dms','-d', - action="store", dest="obi:defaultdms", - metavar='', - default=None, - type=str, - help="Name of the default DMS for reading and writing data.") - - group.add_argument('--input-view','-i', - action="store", dest="obi:inputview", - metavar='', - default=None, - type=str, - help="Name of the input view, either raw if the view is in the default DMS," - " or in the form 'dms:view' if it is in another DMS.") - - group=parser.add_argument_group('obi export specific options') - - group.add_argument('--format','-f', - action="store", dest="export:format", - metavar='', - default="fasta", - type=str, - help="Export in the format , 'fasta' or 'fastq'. Default: 'fasta'.") # TODO export in csv def run(config): - - # TODO import doesn't work - NUC_SEQUENCE_COLUMN = "NUC_SEQ" - ID_COLUMN = "ID" - DEFINITION_COLUMN = "DEFINITION" - QUALITY_COLUMN = "QUALITY" - - special_columns = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN] - - # Open DMS - d = OBIDMS(config['obi']['defaultdms']) - - # Open input view - iview = d.open_view(config['obi']['inputview']) - - print(iview.type) - - # TODO check that the view has the type NUC_SEQS - if ((config['export']['format'] == "fasta") or (config['export']['format'] == "fastq")) and (iview.type != "NUC_SEQS_VIEW") : # TODO find a way to import those macros - raise Exception("Error: the view to export in fasta or fastq format is not a NUC_SEQS view") - - # Initialize the progress bar - pb = ProgressBar(len(iview), config, seconde=5) - - i=0 - for seq in iview : - pb(i) - - toprint = ">"+seq.id+" " - - for col_name in seq : - if col_name not in special_columns : - toprint = toprint + col_name + "=" + str(seq[col_name]) + "; " - - if DEFINITION_COLUMN in seq : - toprint = toprint + seq.definition - - nucseq = bytes2str(seq.nuc_seq) - - if config['export']['format'] == "fasta" : - nucseq = re.sub("(.{60})", "\\1\n", nucseq, 0, re.DOTALL) - - toprint = toprint + "\n" + nucseq - - if config['export']['format'] == "fastq" : - toprint = toprint + "\n" + "+" + "\n" + seq.get_str_quality() - - print(toprint) - i+=1 - - iview.close() - d.close() - - print("Done.") - - + pass - - - - - \ No newline at end of file +# __title__="Export a NUC_SEQS view to a fasta or fastq file" +# +# +# default_config = { 'inputview' : None, +# } +# +# def addOptions(parser): +# +# # TODO put this common group somewhere else but I don't know where +# group=parser.add_argument_group('DMS and view options') +# +# group.add_argument('--default-dms','-d', +# action="store", dest="obi:defaultdms", +# metavar='', +# default=None, +# type=str, +# help="Name of the default DMS for reading and writing data.") +# +# group.add_argument('--input-view','-i', +# action="store", dest="obi:inputview", +# metavar='', +# default=None, +# type=str, +# help="Name of the input view, either raw if the view is in the default DMS," +# " or in the form 'dms:view' if it is in another DMS.") +# +# group=parser.add_argument_group('obi export specific options') +# +# group.add_argument('--format','-f', +# action="store", dest="export:format", +# metavar='', +# default="fasta", +# type=str, +# help="Export in the format , 'fasta' or 'fastq'. Default: 'fasta'.") # TODO export in csv +# +# def run(config): +# +# # TODO import doesn't work +# NUC_SEQUENCE_COLUMN = "NUC_SEQ" +# ID_COLUMN = "ID" +# DEFINITION_COLUMN = "DEFINITION" +# QUALITY_COLUMN = "QUALITY" +# +# special_columns = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN] +# +# # Open DMS +# d = OBIDMS(config['obi']['defaultdms']) +# +# # Open input view +# iview = d.open_view(config['obi']['inputview']) +# +# print(iview.type) +# +# # TODO check that the view has the type NUC_SEQS +# if ((config['export']['format'] == "fasta") or (config['export']['format'] == "fastq")) and (iview.type != "NUC_SEQS_VIEW") : # TODO find a way to import those macros +# raise Exception("Error: the view to export in fasta or fastq format is not a NUC_SEQS view") +# +# # Initialize the progress bar +# pb = ProgressBar(len(iview), config, seconde=5) +# +# i=0 +# for seq in iview : +# pb(i) +# +# toprint = ">"+seq.id+" " +# +# for col_name in seq : +# if col_name not in special_columns : +# toprint = toprint + col_name + "=" + str(seq[col_name]) + "; " +# +# if DEFINITION_COLUMN in seq : +# toprint = toprint + seq.definition +# +# nucseq = bytes2str(seq.nuc_seq) +# +# if config['export']['format'] == "fasta" : +# nucseq = re.sub("(.{60})", "\\1\n", nucseq, 0, re.DOTALL) +# +# toprint = toprint + "\n" + nucseq +# +# if config['export']['format'] == "fastq" : +# toprint = toprint + "\n" + "+" + "\n" + seq.get_str_quality() +# +# print(toprint) +# i+=1 +# +# iview.close() +# d.close() +# +# print("Done.") +# +# +# +# +# +# +# +# \ No newline at end of file diff --git a/python/obitools3/commands/grep.pyx b/python/obitools3/commands/grep.pyx index d34d439..96f7768 100644 --- a/python/obitools3/commands/grep.pyx +++ b/python/obitools3/commands/grep.pyx @@ -1,97 +1,100 @@ -from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport -from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work -#, OBIView, OBIView_line_selection # TODO cimport doesn't work +# from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport +# from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work +# #, OBIView, OBIView_line_selection # TODO cimport doesn't work +# +# from functools import reduce +# import time +# +# __title__="Grep view lines that match the given predicates" +# -from functools import reduce -import time - -__title__="Grep view lines that match the given predicates" - - -default_config = { 'inputview' : None, - 'outputview' : None - } - -def addOptions(parser): - - # TODO put this common group somewhere else but I don't know where - group=parser.add_argument_group('DMS and view options') - - group.add_argument('--default-dms','-d', - action="store", dest="obi:defaultdms", - metavar='', - default=None, - type=str, - help="Name of the default DMS for reading and writing data.") - - group.add_argument('--input-view','-i', - action="store", dest="obi:inputview", - metavar='', - default=None, - type=str, - help="Name of the input view, either raw if the view is in the default DMS," - " or in the form 'dms:view' if it is in another DMS.") - - group.add_argument('--output-view','-o', - action="store", dest="obi:outputview", - metavar='', - default=None, - type=str, - help="Name of the output view, either raw if the view is in the default DMS," - " or in the form 'dms:view' if it is in another DMS.") - - - group=parser.add_argument_group('obi grep specific options') - - group.add_argument('--predicate','-p', - action="append", dest="grep:predicates", - metavar='', - default=None, - type=str, - help="Grep lines that match the given python expression on or .") - - def run(config): - - # Open DMS - d = OBIDMS(config['obi']['defaultdms']) - - # Open input view 1 -# iview = d.open_view(config['obi']['inputview']) -# -# # Initialize the progress bar -# pb = ProgressBar(len(iview), config, seconde=5) -# -# # Apply filter -# selection = OBIView_line_selection(iview) -# for i in range(len(iview)) : -# pb(i) -# line = iview[i] -# -# loc_env = {'sequence': line, 'line': line} # TODO add taxonomy -# -# good = (reduce(lambda bint x, bint y: x and y, -# (bool(eval(p, loc_env, line)) -# for p in config['grep']['predicates']), True)) -# -# if good : -# selection.append(i) -# -# # Create output view with the line selection -# oview = d.clone_view_with_line_selection(config['obi']['outputview'], selection, comments="obi grep: "+str(config['grep']['predicates'])+"\n") -# -# #print("\n") -# #print(repr(oview)) -# -# iview.close() -# oview.close() - d.close() - - - + pass - - - - - \ No newline at end of file +# default_config = { 'inputview' : None, +# 'outputview' : None +# } +# +# def addOptions(parser): +# +# # TODO put this common group somewhere else but I don't know where +# group=parser.add_argument_group('DMS and view options') +# +# group.add_argument('--default-dms','-d', +# action="store", dest="obi:defaultdms", +# metavar='', +# default=None, +# type=str, +# help="Name of the default DMS for reading and writing data.") +# +# group.add_argument('--input-view','-i', +# action="store", dest="obi:inputview", +# metavar='', +# default=None, +# type=str, +# help="Name of the input view, either raw if the view is in the default DMS," +# " or in the form 'dms:view' if it is in another DMS.") +# +# group.add_argument('--output-view','-o', +# action="store", dest="obi:outputview", +# metavar='', +# default=None, +# type=str, +# help="Name of the output view, either raw if the view is in the default DMS," +# " or in the form 'dms:view' if it is in another DMS.") +# +# +# group=parser.add_argument_group('obi grep specific options') +# +# group.add_argument('--predicate','-p', +# action="append", dest="grep:predicates", +# metavar='', +# default=None, +# type=str, +# help="Grep lines that match the given python expression on or .") +# +# +# def run(config): +# +# # Open DMS +# d = OBIDMS(config['obi']['defaultdms']) +# +# # Open input view 1 +# # iview = d.open_view(config['obi']['inputview']) +# # +# # # Initialize the progress bar +# # pb = ProgressBar(len(iview), config, seconde=5) +# # +# # # Apply filter +# # selection = OBIView_line_selection(iview) +# # for i in range(len(iview)) : +# # pb(i) +# # line = iview[i] +# # +# # loc_env = {'sequence': line, 'line': line} # TODO add taxonomy +# # +# # good = (reduce(lambda bint x, bint y: x and y, +# # (bool(eval(p, loc_env, line)) +# # for p in config['grep']['predicates']), True)) +# # +# # if good : +# # selection.append(i) +# # +# # # Create output view with the line selection +# # oview = d.clone_view_with_line_selection(config['obi']['outputview'], selection, comments="obi grep: "+str(config['grep']['predicates'])+"\n") +# # +# # #print("\n") +# # #print(repr(oview)) +# # +# # iview.close() +# # oview.close() +# d.close() +# +# +# +# +# +# +# +# +# \ No newline at end of file diff --git a/python/obitools3/commands/import.pyx b/python/obitools3/commands/import.pyx index f621fee..4b16d70 100644 --- a/python/obitools3/commands/import.pyx +++ b/python/obitools3/commands/import.pyx @@ -1,130 +1,133 @@ -from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport -from obitools3.files.universalopener cimport uopen -from obitools3.parsers.fasta import fastaIterator -from obitools3.parsers.fastq import fastqIterator -from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work +# from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport +# from obitools3.files.universalopener cimport uopen +# from obitools3.parsers.fasta import fastaIterator +# from obitools3.parsers.fastq import fastqIterator +# from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work +# +# import time +# -import time - - -__title__="Counts sequences in a sequence set" - - -default_config = { 'destview' : None, - 'skip' : 0, - 'only' : None, - 'skiperror' : False, - 'seqinformat' : None, - 'moltype' : 'nuc', - 'filename' : None - } - -def addOptions(parser): - parser.add_argument(dest='import:filename', - metavar='', - nargs='?', - default=None, - help='sequence file name to be imported' ) - - group=parser.add_argument_group('obi import specific options') - - group.add_argument('--default-dms','-d', - action="store", dest="obi:defaultdms", - metavar='', - default=None, - type=str, - help="Name of the default DMS for reading and writing data") - - group.add_argument('--destination-view','-v', - action="store", dest="import:destview", - metavar='', - default=None, - type=str, - required=True, - help="Name of the default DMS for reading and writing data") - - group.add_argument('--skip', - action="store", dest="import:skip", - metavar='', - default=None, - type=int, - help="skip the N first sequences") - - group.add_argument('--only', - action="store", dest="import:only", - metavar='', - default=None, - type=int, - help="treat only N sequences") - - group.add_argument('--skip-on-error', - action="store_true", dest="import:skiperror", - default=None, - help="Skip sequence entries with parse error") - - group.add_argument('--fasta', - action="store_const", dest="import:seqinformat", - default=None, - const='fasta', - help="Input file is in fasta nucleic format (including obitools fasta extentions)") - - group.add_argument('--fastq', - action="store_const", dest="import:seqinformat", - default=None, - const='fastq', - help="Input file is in sanger fastq nucleic format (standard fastq)") - - group.add_argument('--nuc', - action="store_const", dest="import:moltype", - default=None, - const='nuc', - help="Input file contains nucleic sequences") - - group.add_argument('--prot', - action="store_const", dest="import:moltype", - default=None, - const='pep', - help="Input file contains protein sequences") - - - -# TODO: Handling of NA values def run(config): - pb = ProgressBar(35000000, config, seconde=5) # TODO should be number of records in file - - inputs = uopen(config['import']['filename']) - - get_quality = False - if config['import']['seqinformat']=='fasta': - iseq = fastaIterator(inputs) - view_type="NUC_SEQS_VIEW" - elif config['import']['seqinformat']=='fastq': - iseq = fastqIterator(inputs) - view_type="NUC_SEQS_VIEW" - get_quality = True - else: - raise RuntimeError('No file format specified') - - # Create DMS - d = OBIDMS(config['obi']['defaultdms']) - - # Create view -# view = d.new_view(config['import']['destview'], view_type=view_type, quality_column=get_quality) + pass + +# __title__="Counts sequences in a sequence set" +# +# +# default_config = { 'destview' : None, +# 'skip' : 0, +# 'only' : None, +# 'skiperror' : False, +# 'seqinformat' : None, +# 'moltype' : 'nuc', +# 'filename' : None +# } +# +# def addOptions(parser): +# parser.add_argument(dest='import:filename', +# metavar='', +# nargs='?', +# default=None, +# help='sequence file name to be imported' ) +# +# group=parser.add_argument_group('obi import specific options') +# +# group.add_argument('--default-dms','-d', +# action="store", dest="obi:defaultdms", +# metavar='', +# default=None, +# type=str, +# help="Name of the default DMS for reading and writing data") # -# i = 0 -# for seq in iseq: -# pb(i) -# view[i].id = seq['id'] -# view[i].definition = seq['definition'] -# view[i].nuc_seq = seq['sequence'] -# if get_quality : -# view[i].quality = seq['quality'] -# for tag in seq['tags'] : -# view[i][tag] = seq['tags'][tag] -# i+=1 +# group.add_argument('--destination-view','-v', +# action="store", dest="import:destview", +# metavar='', +# default=None, +# type=str, +# required=True, +# help="Name of the default DMS for reading and writing data") +# +# group.add_argument('--skip', +# action="store", dest="import:skip", +# metavar='', +# default=None, +# type=int, +# help="skip the N first sequences") # -# #print(view.__repr__()) +# group.add_argument('--only', +# action="store", dest="import:only", +# metavar='', +# default=None, +# type=int, +# help="treat only N sequences") # -# view.close() - d.close() - +# group.add_argument('--skip-on-error', +# action="store_true", dest="import:skiperror", +# default=None, +# help="Skip sequence entries with parse error") +# +# group.add_argument('--fasta', +# action="store_const", dest="import:seqinformat", +# default=None, +# const='fasta', +# help="Input file is in fasta nucleic format (including obitools fasta extentions)") +# +# group.add_argument('--fastq', +# action="store_const", dest="import:seqinformat", +# default=None, +# const='fastq', +# help="Input file is in sanger fastq nucleic format (standard fastq)") +# +# group.add_argument('--nuc', +# action="store_const", dest="import:moltype", +# default=None, +# const='nuc', +# help="Input file contains nucleic sequences") +# +# group.add_argument('--prot', +# action="store_const", dest="import:moltype", +# default=None, +# const='pep', +# help="Input file contains protein sequences") +# +# +# +# # TODO: Handling of NA values +# def run(config): +# pb = ProgressBar(35000000, config, seconde=5) # TODO should be number of records in file +# +# inputs = uopen(config['import']['filename']) +# +# get_quality = False +# if config['import']['seqinformat']=='fasta': +# iseq = fastaIterator(inputs) +# view_type="NUC_SEQS_VIEW" +# elif config['import']['seqinformat']=='fastq': +# iseq = fastqIterator(inputs) +# view_type="NUC_SEQS_VIEW" +# get_quality = True +# else: +# raise RuntimeError('No file format specified') +# +# # Create DMS +# d = OBIDMS(config['obi']['defaultdms']) +# +# # Create view +# # view = d.new_view(config['import']['destview'], view_type=view_type, quality_column=get_quality) +# # +# # i = 0 +# # for seq in iseq: +# # pb(i) +# # view[i].id = seq['id'] +# # view[i].definition = seq['definition'] +# # view[i].nuc_seq = seq['sequence'] +# # if get_quality : +# # view[i].quality = seq['quality'] +# # for tag in seq['tags'] : +# # view[i][tag] = seq['tags'][tag] +# # i+=1 +# # +# # #print(view.__repr__()) +# # +# # view.close() +# d.close() +# diff --git a/python/obitools3/commands/lcs.pyx b/python/obitools3/commands/lcs.pyx index d787bac..c76375e 100644 --- a/python/obitools3/commands/lcs.pyx +++ b/python/obitools3/commands/lcs.pyx @@ -1,213 +1,213 @@ #cython: language_level=3 - -from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport -from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work -from obitools3.utils cimport str2bytes - -from obitools3.dms.capi.obialign cimport obi_lcs_align_one_column, \ - obi_lcs_align_two_columns - - -import time - -__title__="Aligns one sequence column with itself or two sequence columns" - - -default_config = { 'inputview' : None, - } - -def addOptions(parser): - - # TODO put this common group somewhere else but I don't know where. - # Also some options should probably be in another group - group=parser.add_argument_group('DMS and view options') - - group.add_argument('--default-dms', '-d', - action="store", dest="obi:defaultdms", - metavar='', - default=None, - type=str, - help="Name of the default DMS for reading and writing data.") - - group.add_argument('--input-view-1', '-i', - action="store", dest="obi:inputview1", - metavar='', - default=None, - type=str, - help="Name of the (first) input view.") - - group.add_argument('--input-view-2', '-I', - action="store", dest="obi:inputview2", - metavar='', - default="", - type=str, - help="Eventually, the name of the second input view.") - - group.add_argument('--input-column-1', '-c', - action="store", dest="obi:inputcolumn1", - metavar='', - default="", - type=str, - help="Name of the (first) input column. " - " Default: the default nucleotide sequence column of the view if there is one.") - - group.add_argument('--input-column-2', '-C', - action="store", dest="obi:inputcolumn2", - metavar='', - default="", - type=str, - help="Eventually, the name of the second input column.") - - group.add_argument('--input-elt-1', '-e', - action="store", dest="obi:inputelement1", - metavar='', - default="", - type=str, - help="If the first input column has multiple elements per line, name of the element referring to the sequence to align. " - " Default: the first element of the line.") - - group.add_argument('--input-elt-2', '-E', - action="store", dest="obi:inputelement2", - metavar='', - default="", - type=str, - help="If the second input column has multiple elements per line, name of the element referring to the sequence to align. " - " Default: the first element of the line.") - - group.add_argument('--id-column-1', '-f', - action="store", dest="obi:idcolumn1", - metavar='', - default="", - type=str, - help="Name of the (first) column containing the identifiers of the sequences to align. " - " Default: the default ID column of the view if there is one.") - - group.add_argument('--id-column-2', '-F', - action="store", dest="obi:idcolumn2", - metavar='', - default="", - type=str, - help="Eventually, the name of the second ID column.") - - group.add_argument('--output-view', '-o', - action="store", dest="obi:outputview", - metavar='', - default=None, - type=str, - help="Name of the output view.") - - - group=parser.add_argument_group('obi lcs specific options') - - group.add_argument('--threshold','-t', - action="store", dest="align:threshold", - metavar='', - default=0.0, - type=float, - help="Score threshold. If the score is normalized and expressed in similarity (default)," - " it is an identity, e.g. 0.95 for an identity of 95%%. If the score is normalized" - " and expressed in distance, it is (1.0 - identity), e.g. 0.05 for an identity of 95%%." - " If the score is not normalized and expressed in similarity, it is the length of the" - " Longest Common Subsequence. If the score is not normalized and expressed in distance," - " it is (reference length - LCS length)." - " Only sequence pairs with a similarity above are printed. Default: 0.00" - " (no threshold).") - - group.add_argument('--longest-length','-L', - action="store_const", dest="align:reflength", - default=0, - const=1, - help="The reference length is the length of the longest sequence." - " Default: the reference length is the length of the alignment.") - - group.add_argument('--shortest-length','-l', - action="store_const", dest="align:reflength", - default=0, - const=2, - help="The reference length is the length of the shortest sequence." - " Default: the reference length is the length of the alignment.") - - group.add_argument('--raw','-r', - action="store_false", dest="align:normalize", - default=True, - help="Raw score, not normalized. Default: score is normalized with the reference sequence length.") - - group.add_argument('--distance','-D', - action="store_false", dest="align:similarity", - default=True, - help="Score is expressed in distance. Default: score is expressed in similarity.") - - group.add_argument('--print-seq','-s', - action="store_true", dest="align:printseq", - default=False, - help="The nucleotide sequences are written in the output view. Default: they are not written.") - - group.add_argument('--print-count','-n', - action="store_true", dest="align:printcount", - default=False, - help="Sequence counts are written in the output view. Default: they are not written.") - - group.add_argument('--thread-count','-p', # TODO should probably be in a specific option group - action="store", dest="align:threadcount", - metavar='', - default=1, - type=int, - help="Number of threads to use for the computation. Default: one.") - - -# cpdef align(str dms_n, -# str input_view_1_n, str output_view_n, -# str input_view_2_n="", -# str input_column_1_n="", str input_column_2_n="", -# str input_elt_1_n="", str input_elt_2_n="", -# str id_column_1_n="", str id_column_2_n="", -# double threshold=0.0, bint normalize=True, -# int reference=0, bint similarity_mode=True, -# bint print_seq=False, bint print_count=False, -# comments="", -# int thread_count=1) : -# -# cdef OBIDMS d -# d = OBIDMS(dms_n) # -# if input_view_2_n == "" and input_column_2_n == "" : -# if obi_lcs_align_one_column(d._pointer, \ -# str2bytes(input_view_1_n), \ -# str2bytes(input_column_1_n), \ -# str2bytes(input_elt_1_n), \ -# str2bytes(id_column_1_n), \ -# str2bytes(output_view_n), \ -# str2bytes(comments), \ -# print_seq, \ -# print_count, \ -# threshold, normalize, reference, similarity_mode, -# thread_count) < 0 : -# raise Exception("Error aligning sequences") -# else : -# if obi_lcs_align_two_columns(d._pointer, \ -# str2bytes(input_view_1_n), \ -# str2bytes(input_view_2_n), \ -# str2bytes(input_column_1_n), \ -# str2bytes(input_column_2_n), \ -# str2bytes(input_elt_1_n), \ -# str2bytes(input_elt_2_n), \ -# str2bytes(id_column_1_n), \ -# str2bytes(id_column_2_n), \ -# str2bytes(output_view_n), \ -# str2bytes(comments), \ -# print_seq, \ -# print_count, \ -# threshold, normalize, reference, similarity_mode) < 0 : -# raise Exception("Error aligning sequences") -# -# d.close() +# from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport +# from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work +# from obitools3.utils cimport str2bytes +# +# from obitools3.dms.capi.obialign cimport obi_lcs_align_one_column, \ +# obi_lcs_align_two_columns # # -# def run(config): -# -# # TODO: Build formatted comments with all parameters etc +# import time +# +# __title__="Aligns one sequence column with itself or two sequence columns" +# +# +# default_config = { 'inputview' : None, +# } +# +# def addOptions(parser): +# +# # TODO put this common group somewhere else but I don't know where. +# # Also some options should probably be in another group +# group=parser.add_argument_group('DMS and view options') +# +# group.add_argument('--default-dms', '-d', +# action="store", dest="obi:defaultdms", +# metavar='', +# default=None, +# type=str, +# help="Name of the default DMS for reading and writing data.") +# +# group.add_argument('--input-view-1', '-i', +# action="store", dest="obi:inputview1", +# metavar='', +# default=None, +# type=str, +# help="Name of the (first) input view.") +# +# group.add_argument('--input-view-2', '-I', +# action="store", dest="obi:inputview2", +# metavar='', +# default="", +# type=str, +# help="Eventually, the name of the second input view.") +# +# group.add_argument('--input-column-1', '-c', +# action="store", dest="obi:inputcolumn1", +# metavar='', +# default="", +# type=str, +# help="Name of the (first) input column. " +# " Default: the default nucleotide sequence column of the view if there is one.") +# +# group.add_argument('--input-column-2', '-C', +# action="store", dest="obi:inputcolumn2", +# metavar='', +# default="", +# type=str, +# help="Eventually, the name of the second input column.") +# +# group.add_argument('--input-elt-1', '-e', +# action="store", dest="obi:inputelement1", +# metavar='', +# default="", +# type=str, +# help="If the first input column has multiple elements per line, name of the element referring to the sequence to align. " +# " Default: the first element of the line.") +# +# group.add_argument('--input-elt-2', '-E', +# action="store", dest="obi:inputelement2", +# metavar='', +# default="", +# type=str, +# help="If the second input column has multiple elements per line, name of the element referring to the sequence to align. " +# " Default: the first element of the line.") +# +# group.add_argument('--id-column-1', '-f', +# action="store", dest="obi:idcolumn1", +# metavar='', +# default="", +# type=str, +# help="Name of the (first) column containing the identifiers of the sequences to align. " +# " Default: the default ID column of the view if there is one.") +# +# group.add_argument('--id-column-2', '-F', +# action="store", dest="obi:idcolumn2", +# metavar='', +# default="", +# type=str, +# help="Eventually, the name of the second ID column.") +# +# group.add_argument('--output-view', '-o', +# action="store", dest="obi:outputview", +# metavar='', +# default=None, +# type=str, +# help="Name of the output view.") +# +# +# group=parser.add_argument_group('obi lcs specific options') +# +# group.add_argument('--threshold','-t', +# action="store", dest="align:threshold", +# metavar='', +# default=0.0, +# type=float, +# help="Score threshold. If the score is normalized and expressed in similarity (default)," +# " it is an identity, e.g. 0.95 for an identity of 95%%. If the score is normalized" +# " and expressed in distance, it is (1.0 - identity), e.g. 0.05 for an identity of 95%%." +# " If the score is not normalized and expressed in similarity, it is the length of the" +# " Longest Common Subsequence. If the score is not normalized and expressed in distance," +# " it is (reference length - LCS length)." +# " Only sequence pairs with a similarity above are printed. Default: 0.00" +# " (no threshold).") +# +# group.add_argument('--longest-length','-L', +# action="store_const", dest="align:reflength", +# default=0, +# const=1, +# help="The reference length is the length of the longest sequence." +# " Default: the reference length is the length of the alignment.") +# +# group.add_argument('--shortest-length','-l', +# action="store_const", dest="align:reflength", +# default=0, +# const=2, +# help="The reference length is the length of the shortest sequence." +# " Default: the reference length is the length of the alignment.") +# +# group.add_argument('--raw','-r', +# action="store_false", dest="align:normalize", +# default=True, +# help="Raw score, not normalized. Default: score is normalized with the reference sequence length.") +# +# group.add_argument('--distance','-D', +# action="store_false", dest="align:similarity", +# default=True, +# help="Score is expressed in distance. Default: score is expressed in similarity.") +# +# group.add_argument('--print-seq','-s', +# action="store_true", dest="align:printseq", +# default=False, +# help="The nucleotide sequences are written in the output view. Default: they are not written.") +# +# group.add_argument('--print-count','-n', +# action="store_true", dest="align:printcount", +# default=False, +# help="Sequence counts are written in the output view. Default: they are not written.") +# +# group.add_argument('--thread-count','-p', # TODO should probably be in a specific option group +# action="store", dest="align:threadcount", +# metavar='', +# default=1, +# type=int, +# help="Number of threads to use for the computation. Default: one.") +# +# +# # cpdef align(str dms_n, +# # str input_view_1_n, str output_view_n, +# # str input_view_2_n="", +# # str input_column_1_n="", str input_column_2_n="", +# # str input_elt_1_n="", str input_elt_2_n="", +# # str id_column_1_n="", str id_column_2_n="", +# # double threshold=0.0, bint normalize=True, +# # int reference=0, bint similarity_mode=True, +# # bint print_seq=False, bint print_count=False, +# # comments="", +# # int thread_count=1) : +# # +# # cdef OBIDMS d +# # d = OBIDMS(dms_n) +# # +# # if input_view_2_n == "" and input_column_2_n == "" : +# # if obi_lcs_align_one_column(d._pointer, \ +# # str2bytes(input_view_1_n), \ +# # str2bytes(input_column_1_n), \ +# # str2bytes(input_elt_1_n), \ +# # str2bytes(id_column_1_n), \ +# # str2bytes(output_view_n), \ +# # str2bytes(comments), \ +# # print_seq, \ +# # print_count, \ +# # threshold, normalize, reference, similarity_mode, +# # thread_count) < 0 : +# # raise Exception("Error aligning sequences") +# # else : +# # if obi_lcs_align_two_columns(d._pointer, \ +# # str2bytes(input_view_1_n), \ +# # str2bytes(input_view_2_n), \ +# # str2bytes(input_column_1_n), \ +# # str2bytes(input_column_2_n), \ +# # str2bytes(input_elt_1_n), \ +# # str2bytes(input_elt_2_n), \ +# # str2bytes(id_column_1_n), \ +# # str2bytes(id_column_2_n), \ +# # str2bytes(output_view_n), \ +# # str2bytes(comments), \ +# # print_seq, \ +# # print_count, \ +# # threshold, normalize, reference, similarity_mode) < 0 : +# # raise Exception("Error aligning sequences") +# # +# # d.close() +# # +# # +def run(config): + pass + # TODO: Build formatted comments with all parameters etc # comments = "Obi align" -# +# # # Call cython alignment function # align(config['obi']['defaultdms'], \ # config['obi']['inputview1'], \ @@ -227,10 +227,10 @@ def addOptions(parser): # print_count = config['align']['printcount'], \ # comments = comments, \ # thread_count = config['align']['threadcount']) -# +# # print("Done.") -# -# -# -# -# \ No newline at end of file +# # +# # +# # +# # +# # \ No newline at end of file diff --git a/python/obitools3/commands/test.pyx b/python/obitools3/commands/test.pyx index 5277e82..4495c80 100644 --- a/python/obitools3/commands/test.pyx +++ b/python/obitools3/commands/test.pyx @@ -1,383 +1,393 @@ from obitools3.apps.progress cimport ProgressBar # TODO I absolutely don't understand why it doesn't work without that line -from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work -# from obitools3.obidms._obidms import OBIView_line_selection -# from obitools3.utils cimport str2bytes -# -# import shutil -# import string -# import random -# -# -# VIEW_TYPES = ["", "NUC_SEQS_VIEW"] -# COL_TYPES = ["OBI_BOOL", "OBI_CHAR", "OBI_FLOAT", "OBI_INT", "OBI_SEQ", "OBI_STR"] -# NUC_SEQUENCE_COLUMN = "NUC_SEQ" -# ID_COLUMN = "ID" -# DEFINITION_COLUMN = "DEFINITION" -# QUALITY_COLUMN = "QUALITY" -# SPECIAL_COLUMNS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN] -# -# -# NAME_MAX_LEN = 200 -# COL_COMMENTS_MAX_LEN = 2048 -# MAX_INT = 2147483647 # used to generate random float values -# -# -# __title__="Tests if the obitools are working properly" -# -# -# default_config = { -# } -# -# -# def random_length(max_len): -# return random.randint(1, max_len) -# -# -# def random_bool(config): -# return random.choice([True, False]) -# -# -# def random_char(config): -# return random.choice(string.ascii_lowercase) -# -# -# def random_float(config): -# return random.randint(0, MAX_INT) + random.random() -# -# -# def random_int(config): -# return random.randint(0, config['test']['maxlinenb']) -# -# -# def random_seq(config): -# return str2bytes(''.join(random.choice(['a','t','g','c']) for i in range(random_length(config['test']['seqmaxlen'])))) -# -# -# def random_str(config): -# return random_str_with_max_len(config['test']['strmaxlen']) -# -# -# def random_str_with_max_len(max_len): -# return ''.join(random.choice(string.ascii_lowercase) for i in range(random_length(max_len))) -# -# -# def random_column(infos): -# return random.choice(sorted(list(infos['view'].columns))) -# -# -# def random_unique_name(infos): -# name = "" -# while name == "" or name in infos['unique_names'] : -# name = random_str_with_max_len(NAME_MAX_LEN) -# infos['unique_names'].append(name) -# return name -# -# -# def random_unique_element_name(config, infos): -# name = "" -# while name == "" or name in infos['unique_names'] : -# name = random_str_with_max_len(config['test']['elt_name_max_len']) -# infos['unique_names'].append(name) -# return name -# -# -# def print_test(config, sentence): -# if config['test']['verbose'] : -# print(sentence) -# -# -# def test_set_and_get(config, infos): -# print_test(config, ">>> Set and get test") -# col_name = random_column(infos) -# col = infos['view'][col_name] -# element_names = col.elements_names -# data_type = col.data_type -# if data_type == "OBI_QUAL" : -# print_test(config, "-") -# return -# idx = random_int(config) -# value = infos['random_generator'][data_type](config) -# if col.nb_elements_per_line > 1 : -# elt = random.choice(element_names) -# col[idx][elt] = value -# assert col[idx][elt] == value, "Set value != gotten value "+str(col[idx][elt])+" != "+str(value) -# else: -# col[idx] = value -# assert col[idx] == value, "Set value != gotten value "+str(col[idx])+" != "+str(value) -# -# print_test(config, ">>> Set and get test OK") -# -# -# def test_add_col(config, infos): -# print_test(config, ">>> Add column test") -# #existing_col = random_bool(config) # TODO doesn't work because of line count problem. See obiview.c line 1737 -# #if existing_col and infos["view_names"] != [] : -# # random_view = infos['dms'].open_view(random.choice(infos["view_names"])) -# # random_column = random_view[random.choice(sorted(list(random_view.columns))] -# # random_column_refs = random_column.refs -# # if random_column_refs['name'] in infos['view'] : -# # alias = random_unique_name(infos) -# # else : -# # alias = '' -# # infos['view'].add_column(random_column_refs['name'], version_number=random_column_refs['version'], alias=alias, create=False) -# # random_view.close() -# #else : -# create_random_column(config, infos) -# print_test(config, ">>> Add column test OK") -# -# -# def test_delete_col(config, infos): -# print_test(config, ">>> Delete column test") -# if len(list(infos['view'].columns)) <= 1 : -# print_test(config, "-") -# return -# col_name = random_column(infos) -# if col_name in SPECIAL_COLUMNS : -# print_test(config, "-") -# return -# infos['view'].delete_column(col_name) -# print_test(config, ">>> Delete column test OK") -# -# -# def test_col_alias(config, infos): -# print_test(config, ">>> Changing column alias test") -# col_name = random_column(infos) -# if col_name in SPECIAL_COLUMNS : -# print_test(config, "-") -# return -# infos['view'][col_name].alias = random_unique_name(infos) -# print_test(config, ">>> Changing column alias test OK") -# -# -# def test_new_view(config, infos): -# print_test(config, ">>> New view test") -# random_new_view(config, infos) -# print_test(config, ">>> New view test OK") -# -# -# def random_test(config, infos): -# return random.choice(infos['tests'])(config, infos) -# -# -# def random_view_type(): -# return random.choice(VIEW_TYPES) -# -# -# def random_col_type(): -# return random.choice(COL_TYPES) -# -# -# def fill_column(config, infos, col) : -# data_type = col.data_type -# element_names = col.elements_names -# -# if len(element_names) > 1 : -# for i in range(random_int(config)) : -# for j in range(len(element_names)) : -# col[i][element_names[j]] = infos['random_generator'][data_type](config) -# else : -# for i in range(random_int(config)) : -# col[i] = infos['random_generator'][data_type](config) -# -# -# def create_random_column(config, infos) : -# alias = random.choice(['', random_unique_name(infos)]) -# nb_elements_per_line=random.randint(1, config['test']['maxelts']) -# elements_names = [] -# for i in range(nb_elements_per_line) : -# elements_names.append(random_unique_element_name(config, infos)) -# elements_names = random.choice([None, elements_names]) -# name = random_unique_name(infos) -# infos['view'].add_column(name, -# alias=alias, -# type=random_col_type(), -# nb_elements_per_line=nb_elements_per_line, -# elements_names=elements_names, -# indexer_name=random.choice(['', random_unique_name(infos)]), -# comments=random_str_with_max_len(COL_COMMENTS_MAX_LEN), -# create=True -# ) -# if alias != '' : -# return infos['view'][alias] -# else : -# return infos['view'][name] -# -# -# def fill_view(config, infos): -# for i in range(random.randint(1, config['test']['maxinicolcount'])) : -# col = create_random_column(config, infos) -# fill_column(config, infos, col) -# -# -# def random_new_view(config, infos, first=False): -# v_to_clone = None -# line_selection = None -# quality_col = False # TODO -# if not first: -# infos['view_names'].append(infos['view'].name) -# infos['view'].close() -# v_to_clone = infos['dms'].open_view(random.choice(infos["view_names"])) -# v_type = "" -# print_test(config, "View to clone: ") -# print_test(config, repr(v_to_clone)) -# create_line_selection = random_bool(config) -# if create_line_selection and v_to_clone.line_count > 0: -# print_test(config, "New view with new line selection.") -# line_selection = OBIView_line_selection(v_to_clone) -# for i in range(random.randint(1, v_to_clone.line_count)) : -# line_selection.append(random.randint(0, v_to_clone.line_count-1)) -# #print_test(config, "New line selection: "+str(line_selection)) -# else : -# v_type = random_view_type() -# -# if line_selection is not None : -# infos['view'] = infos['dms'].clone_view_with_line_selection(random_unique_name(infos), line_selection, comments=random_str_with_max_len(config['test']['commentsmaxlen'])) -# elif v_to_clone is not None : -# infos['view'] = infos['dms'].clone_view(random_unique_name(infos), v_to_clone, comments=random_str_with_max_len(config['test']['commentsmaxlen'])) -# else : -# infos['view'] = infos['dms'].new_view(random_unique_name(infos), view_type=v_type, comments=random_str_with_max_len(config['test']['commentsmaxlen']), quality_column=quality_col) -# -# print_test(config, repr(infos['view'])) -# if v_to_clone is not None : -# if line_selection is None: -# assert v_to_clone.line_count == infos['view'].line_count, "New view and cloned view don't have the same line count" -# else : -# assert len(line_selection) == infos['view'].line_count, "New view with new line selection does not have the right line count" -# v_to_clone.close() -# if first : -# fill_view(config, infos) -# -# -# def create_test_obidms(config, infos): -# infos['dms'] = OBIDMS(config['obi']['defaultdms']) -# -# -# def ini_dms_and_first_view(config, infos): -# create_test_obidms(config, infos) -# random_new_view(config, infos, first=True) -# infos['view_names'] = [] -# -# -# def addOptions(parser): -# -# # TODO put this common group somewhere else but I don't know where -# group=parser.add_argument_group('DMS and view options') -# -# group.add_argument('--default-dms','-d', -# action="store", dest="obi:defaultdms", -# metavar='', -# default="/tmp/test_dms", -# type=str, -# help="Name of the default DMS for reading and writing data. " -# "Default: /tmp/test_dms") -# -# group=parser.add_argument_group('obi test specific options') -# -# group.add_argument('--nb_tests','-n', -# action="store", dest="test:nbtests", -# metavar='', -# default=1000, -# type=int, -# help="Number of tests to carry out. " -# "Default: 1000") -# -# group.add_argument('--seq_max_len','-s', -# action="store", dest="test:seqmaxlen", -# metavar='', -# default=200, -# type=int, -# help="Maximum length of DNA sequences. " -# "Default: 200") -# -# group.add_argument('--str_max_len','-t', -# action="store", dest="test:strmaxlen", -# metavar='', -# default=200, -# type=int, -# help="Maximum length of character strings. " -# "Default: 200") -# -# group.add_argument('--comments_max_len','-c', -# action="store", dest="test:commentsmaxlen", -# metavar='', -# default=10000, -# type=int, -# help="Maximum length of view comments. " -# "Default: 10000") -# -# group.add_argument('--max_ini_col_count','-o', -# action="store", dest="test:maxinicolcount", -# metavar='', -# default=10, -# type=int, -# help="Maximum number of columns in the initial view. " -# "Default: 10") -# -# group.add_argument('--max_line_nb','-l', -# action="store", dest="test:maxlinenb", -# metavar='', -# default=10000, -# type=int, -# help="Maximum number of lines in a column. " -# "Default: 10000") -# -# group.add_argument('--max_elts_per_line','-e', -# action="store", dest="test:maxelts", -# metavar='', -# default=20, -# type=int, -# help="Maximum number of elements per line in a column. " -# "Default: 20") -# -# group.add_argument('--verbose','-v', -# action="store_true", dest="test:verbose", -# default=False, -# help="Print the tests. " -# "Default: Don't print the tests") -# -# group.add_argument('--seed','-g', -# action="store", dest="test:seed", -# metavar='', -# default=None, -# help="Seed (use for reproducible tests). " -# "Default: Seed is determined by Python") -# -# def run(config): -# -# if 'seed' in config['test'] : -# random.seed(config['test']['seed']) -# -# infos = {'dms': None, -# 'view': None, -# 'view_names': None, -# 'unique_names': [], -# 'random_generator': {"OBI_BOOL": random_bool, "OBI_CHAR": random_char, "OBI_FLOAT": random_float, "OBI_INT": random_int, "OBI_SEQ": random_seq, "OBI_STR": random_str}, -# 'tests': [test_set_and_get, test_add_col, test_delete_col, test_col_alias, test_new_view] -# } -# -# config['test']['elt_name_max_len'] = int((COL_COMMENTS_MAX_LEN - config['test']['maxelts']) / config['test']['maxelts']) -# -# print("Initializing the DMS and the first view...") -# -# shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True) -# -# ini_dms_and_first_view(config, infos) -# print_test(config, repr(infos['view'])) -# -# i = 0 -# for t in range(config['test']['nbtests']): -# random_test(config, infos) -# print_test(config, repr(infos['view'])) -# i+=1 -# if (i%(config['test']['nbtests']/10)) == 0 : -# print("Testing......"+str(i*100/config['test']['nbtests'])+"%") -# -# #print(infos) -# -# infos['view'].close() -# infos['dms'].close() -# shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True) -# -# print("Done.") +from obitools3.dms.view.view import View, Line_selection +from obitools3.dms.dms import DMS +from obitools3.dms.column import Column +from obitools3.utils cimport str2bytes + +import shutil +import string +import random + + +VIEW_TYPES = [""] #, "NUC_SEQS_VIEW"] +#COL_TYPES = ["OBI_BOOL", "OBI_CHAR", "OBI_FLOAT", "OBI_INT", "OBI_SEQ", "OBI_STR"] +COL_TYPES = [1, 2, 3, 4, 6, 7] +NUC_SEQUENCE_COLUMN = "NUC_SEQ" +ID_COLUMN = "ID" +DEFINITION_COLUMN = "DEFINITION" +QUALITY_COLUMN = "QUALITY" +SPECIAL_COLUMNS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN] + + +NAME_MAX_LEN = 200 +COL_COMMENTS_MAX_LEN = 2048 +MAX_INT = 2147483647 # used to generate random float values + + +__title__="Tests if the obitools are working properly" + + +default_config = { + } + + +def random_length(max_len): + return random.randint(1, max_len) + + +def random_bool(config): + return random.choice([True, False]) + + +def random_char(config): + return str2bytes(random.choice(string.ascii_lowercase)) + + +def random_float(config): + return random.randint(0, MAX_INT) + random.random() + + +def random_int(config): + return random.randint(0, config['test']['maxlinenb']) + + +def random_seq(config): + return str2bytes(''.join(random.choice(['a','t','g','c']) for i in range(random_length(config['test']['seqmaxlen'])))) + + +def random_bytes(config): + return random_bytes_with_max_len(config['test']['strmaxlen']) + + +def random_str_with_max_len(max_len): + return ''.join(random.choice(string.ascii_lowercase) for i in range(random_length(max_len))) - \ No newline at end of file + +def random_bytes_with_max_len(max_len): + return str2bytes(''.join(random.choice(string.ascii_lowercase) for i in range(random_length(max_len)))) + + +def random_column(infos): + return random.choice(sorted(list(infos['view'].keys()))) + + +def random_unique_name(infos): + name = "" + while name == "" or name in infos['unique_names'] : + name = random_str_with_max_len(NAME_MAX_LEN) + infos['unique_names'].append(name) + return name + + +def random_unique_element_name(config, infos): + name = "" + while name == "" or name in infos['unique_names'] : + name = random_str_with_max_len(config['test']['elt_name_max_len']) + infos['unique_names'].append(name) + return name + + +def print_test(config, sentence): + if config['test']['verbose'] : + print(sentence) + + +def test_set_and_get(config, infos): + print_test(config, ">>> Set and get test") + col_name = random_column(infos) + col = infos['view'][col_name] + element_names = col.elements_names + data_type = col.data_type + if data_type == "OBI_QUAL" : + print_test(config, "-") + return + idx = random_int(config) + value = random.choice([None, infos['random_generator'][data_type](config)]) + if col.nb_elements_per_line > 1 : + elt = random.choice(element_names) + col[idx][elt] = value + assert col[idx][elt] == value, "Column: "+repr(col)+"\nSet value != gotten value "+str(value)+" != "+str(col[idx][elt]) + else: + col[idx] = value + assert col[idx] == value, "Column: "+repr(col)+"\nSet value != gotten value "+str(value)+" != "+str(col[idx]) + + print_test(config, ">>> Set and get test OK") + + +def test_add_col(config, infos): + print_test(config, ">>> Add column test") + #existing_col = random_bool(config) # TODO doesn't work because of line count problem. See obiview.c line 1737 + #if existing_col and infos["view_names"] != [] : + # random_view = infos['dms'].open_view(random.choice(infos["view_names"])) + # random_column = random_view[random.choice(sorted(list(random_view.columns))] + # random_column_refs = random_column.refs + # if random_column_refs['name'] in infos['view'] : + # alias = random_unique_name(infos) + # else : + # alias = '' + # infos['view'].add_column(random_column_refs['name'], version_number=random_column_refs['version'], alias=alias, create=False) + # random_view.close() + #else : + create_random_column(config, infos) + print_test(config, ">>> Add column test OK") + + +def test_delete_col(config, infos): + print_test(config, ">>> Delete column test") + if len(list(infos['view'].keys())) <= 1 : + print_test(config, "-") + return + col_name = random_column(infos) + if col_name in SPECIAL_COLUMNS : + print_test(config, "-") + return + infos['view'].delete_column(col_name) + print_test(config, ">>> Delete column test OK") + + +def test_col_alias(config, infos): + print_test(config, ">>> Changing column alias test") + col_name = random_column(infos) + if col_name in SPECIAL_COLUMNS : + print_test(config, "-") + return + infos['view'][col_name].name = random_unique_name(infos) + print_test(config, ">>> Changing column alias test OK") + + +def test_new_view(config, infos): + print_test(config, ">>> New view test") + random_new_view(config, infos) + print_test(config, ">>> New view test OK") + + +def random_test(config, infos): + return random.choice(infos['tests'])(config, infos) + + +def random_view_type(): + return random.choice(VIEW_TYPES) + + +def random_col_type(): + return random.choice(COL_TYPES) + + +def fill_column(config, infos, col) : + data_type = col.data_type + element_names = col.elements_names + + if len(element_names) > 1 : + for i in range(random_int(config)) : + for j in range(len(element_names)) : + col[i][element_names[j]] = random.choice([None, infos['random_generator'][data_type](config)]) + else : + for i in range(random_int(config)) : + col[i] = random.choice([None, infos['random_generator'][data_type](config)]) + + +def create_random_column(config, infos) : + alias = random.choice(['', random_unique_name(infos)]) + nb_elements_per_line=random.randint(1, config['test']['maxelts']) + elements_names = [] + for i in range(nb_elements_per_line) : + elements_names.append(random_unique_element_name(config, infos)) + elements_names = random.choice([None, elements_names]) + name = random_unique_name(infos) + data_type = random_col_type() + + column = Column.new_column(infos['view'], + name, + data_type, + nb_elements_per_line=nb_elements_per_line, + elements_names=elements_names, + comments=random_str_with_max_len(COL_COMMENTS_MAX_LEN), + alias=alias + ) + + if alias != '' : + assert infos['view'][alias] == column + else : + assert infos['view'][name] == column + + return column + + +def fill_view(config, infos): + for i in range(random.randint(1, config['test']['maxinicolcount'])) : + col = create_random_column(config, infos) + fill_column(config, infos, col) + + +def random_new_view(config, infos, first=False): + v_to_clone = None + line_selection = None + quality_col = False # TODO + if not first: + infos['view_names'].append(infos['view'].name) + infos['view'].close() + v_to_clone = View.open(infos['dms'], random.choice(infos["view_names"])) + v_type = "" + print_test(config, "View to clone: ") + print_test(config, repr(v_to_clone)) + create_line_selection = random_bool(config) + if create_line_selection and v_to_clone.line_count > 0: + print_test(config, "New view with new line selection.") + line_selection = Line_selection(v_to_clone) + for i in range(random.randint(1, v_to_clone.line_count)) : + line_selection.append(random.randint(0, v_to_clone.line_count-1)) + #print_test(config, "New line selection: "+str(line_selection)) + else : + v_type = random_view_type() + + if line_selection is not None : + infos['view'] = line_selection.materialize(random_unique_name(infos), comments=random_str_with_max_len(config['test']['commentsmaxlen'])) + elif v_to_clone is not None : + infos['view'] = v_to_clone.clone(random_unique_name(infos), comments=random_str_with_max_len(config['test']['commentsmaxlen'])) + else : + infos['view'] = View.new(infos['dms'], random_unique_name(infos), comments=random_str_with_max_len(config['test']['commentsmaxlen'])) # TODO random view class + + print_test(config, repr(infos['view'])) + if v_to_clone is not None : + if line_selection is None: + assert v_to_clone.line_count == infos['view'].line_count, "New view and cloned view don't have the same line count : "+str(v_to_clone.line_count)+" (view to clone line count) != "+str(infos['view'].line_count)+" (new view line count)" + else : + assert len(line_selection) == infos['view'].line_count, "New view with new line selection does not have the right line count : "+str(len(line_selection))+" (line selection length) != "+str(infos['view'].line_count)+" (new view line count)" + v_to_clone.close() + if first : + fill_view(config, infos) + + +def create_test_obidms(config, infos): + infos['dms'] = DMS.new(config['obi']['defaultdms']) + + +def ini_dms_and_first_view(config, infos): + create_test_obidms(config, infos) + random_new_view(config, infos, first=True) + infos['view_names'] = [] + + +def addOptions(parser): + + # TODO put this common group somewhere else but I don't know where + group=parser.add_argument_group('DMS and view options') + + group.add_argument('--default-dms','-d', + action="store", dest="obi:defaultdms", + metavar='', + default="/tmp/test_dms", + type=str, + help="Name of the default DMS for reading and writing data. " + "Default: /tmp/test_dms") + + group=parser.add_argument_group('obi test specific options') + + group.add_argument('--nb_tests','-n', + action="store", dest="test:nbtests", + metavar='', + default=1000, + type=int, + help="Number of tests to carry out. " + "Default: 1000") + + group.add_argument('--seq_max_len','-s', + action="store", dest="test:seqmaxlen", + metavar='', + default=200, + type=int, + help="Maximum length of DNA sequences. " + "Default: 200") + + group.add_argument('--str_max_len','-t', + action="store", dest="test:strmaxlen", + metavar='', + default=200, + type=int, + help="Maximum length of character strings. " + "Default: 200") + + group.add_argument('--comments_max_len','-c', + action="store", dest="test:commentsmaxlen", + metavar='', + default=10000, + type=int, + help="Maximum length of view comments. " + "Default: 10000") + + group.add_argument('--max_ini_col_count','-o', + action="store", dest="test:maxinicolcount", + metavar='', + default=10, + type=int, + help="Maximum number of columns in the initial view. " + "Default: 10") + + group.add_argument('--max_line_nb','-l', + action="store", dest="test:maxlinenb", + metavar='', + default=10000, + type=int, + help="Maximum number of lines in a column. " + "Default: 10000") + + group.add_argument('--max_elts_per_line','-e', + action="store", dest="test:maxelts", + metavar='', + default=20, + type=int, + help="Maximum number of elements per line in a column. " + "Default: 20") + + group.add_argument('--verbose','-v', + action="store_true", dest="test:verbose", + default=False, + help="Print the tests. " + "Default: Don't print the tests") + + group.add_argument('--seed','-g', + action="store", dest="test:seed", + metavar='', + default=None, + help="Seed (use for reproducible tests). " + "Default: Seed is determined by Python") + +def run(config): + + if 'seed' in config['test'] : + random.seed(config['test']['seed']) + + infos = {'dms': None, + 'view': None, + 'view_names': None, + 'unique_names': [], + 'random_generator': {b"OBI_BOOL": random_bool, b"OBI_CHAR": random_char, b"OBI_FLOAT": random_float, b"OBI_INT": random_int, b"OBI_SEQ": random_seq, b"OBI_STR": random_bytes}, + 'tests': [test_set_and_get, test_add_col, test_delete_col, test_col_alias, test_new_view] + } + + config['test']['elt_name_max_len'] = int((COL_COMMENTS_MAX_LEN - config['test']['maxelts']) / config['test']['maxelts']) + + print("Initializing the DMS and the first view...") + + shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True) + + ini_dms_and_first_view(config, infos) + print_test(config, repr(infos['view'])) + + i = 0 + for t in range(config['test']['nbtests']): + random_test(config, infos) + print_test(config, repr(infos['view'])) + i+=1 + if (i%(config['test']['nbtests']/10)) == 0 : + print("Testing......"+str(i*100/config['test']['nbtests'])+"%") + + #print(infos) + + infos['view'].close() + infos['dms'].close() + shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True) + + print("Done.") + + \ No newline at end of file