Cython API: updated the test command for the new API and deactivated the
other commands for now
This commit is contained in:
@ -1,105 +1,109 @@
|
||||
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
|
||||
from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
|
||||
from obitools3.utils cimport bytes2str
|
||||
# from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
|
||||
# from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
|
||||
# from obitools3.utils cimport bytes2str
|
||||
#
|
||||
# import time
|
||||
# import re
|
||||
|
||||
import time
|
||||
import re
|
||||
|
||||
__title__="Export a NUC_SEQS view to a fasta or fastq file"
|
||||
|
||||
|
||||
default_config = { 'inputview' : None,
|
||||
}
|
||||
|
||||
def addOptions(parser):
|
||||
|
||||
# TODO put this common group somewhere else but I don't know where
|
||||
group=parser.add_argument_group('DMS and view options')
|
||||
|
||||
group.add_argument('--default-dms','-d',
|
||||
action="store", dest="obi:defaultdms",
|
||||
metavar='<DMS NAME>',
|
||||
default=None,
|
||||
type=str,
|
||||
help="Name of the default DMS for reading and writing data.")
|
||||
|
||||
group.add_argument('--input-view','-i',
|
||||
action="store", dest="obi:inputview",
|
||||
metavar='<INPUT VIEW NAME>',
|
||||
default=None,
|
||||
type=str,
|
||||
help="Name of the input view, either raw if the view is in the default DMS,"
|
||||
" or in the form 'dms:view' if it is in another DMS.")
|
||||
|
||||
group=parser.add_argument_group('obi export specific options')
|
||||
|
||||
group.add_argument('--format','-f',
|
||||
action="store", dest="export:format",
|
||||
metavar='<FORMAT>',
|
||||
default="fasta",
|
||||
type=str,
|
||||
help="Export in the format <FORMAT>, 'fasta' or 'fastq'. Default: 'fasta'.") # TODO export in csv
|
||||
|
||||
def run(config):
|
||||
pass
|
||||
|
||||
# TODO import doesn't work
|
||||
NUC_SEQUENCE_COLUMN = "NUC_SEQ"
|
||||
ID_COLUMN = "ID"
|
||||
DEFINITION_COLUMN = "DEFINITION"
|
||||
QUALITY_COLUMN = "QUALITY"
|
||||
|
||||
special_columns = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN]
|
||||
|
||||
# Open DMS
|
||||
d = OBIDMS(config['obi']['defaultdms'])
|
||||
|
||||
# Open input view
|
||||
iview = d.open_view(config['obi']['inputview'])
|
||||
|
||||
print(iview.type)
|
||||
|
||||
# TODO check that the view has the type NUC_SEQS
|
||||
if ((config['export']['format'] == "fasta") or (config['export']['format'] == "fastq")) and (iview.type != "NUC_SEQS_VIEW") : # TODO find a way to import those macros
|
||||
raise Exception("Error: the view to export in fasta or fastq format is not a NUC_SEQS view")
|
||||
|
||||
# Initialize the progress bar
|
||||
pb = ProgressBar(len(iview), config, seconde=5)
|
||||
|
||||
i=0
|
||||
for seq in iview :
|
||||
pb(i)
|
||||
|
||||
toprint = ">"+seq.id+" "
|
||||
|
||||
for col_name in seq :
|
||||
if col_name not in special_columns :
|
||||
toprint = toprint + col_name + "=" + str(seq[col_name]) + "; "
|
||||
|
||||
if DEFINITION_COLUMN in seq :
|
||||
toprint = toprint + seq.definition
|
||||
|
||||
nucseq = bytes2str(seq.nuc_seq)
|
||||
|
||||
if config['export']['format'] == "fasta" :
|
||||
nucseq = re.sub("(.{60})", "\\1\n", nucseq, 0, re.DOTALL)
|
||||
|
||||
toprint = toprint + "\n" + nucseq
|
||||
|
||||
if config['export']['format'] == "fastq" :
|
||||
toprint = toprint + "\n" + "+" + "\n" + seq.get_str_quality()
|
||||
|
||||
print(toprint)
|
||||
i+=1
|
||||
|
||||
iview.close()
|
||||
d.close()
|
||||
|
||||
print("Done.")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# __title__="Export a NUC_SEQS view to a fasta or fastq file"
|
||||
#
|
||||
#
|
||||
# default_config = { 'inputview' : None,
|
||||
# }
|
||||
#
|
||||
# def addOptions(parser):
|
||||
#
|
||||
# # TODO put this common group somewhere else but I don't know where
|
||||
# group=parser.add_argument_group('DMS and view options')
|
||||
#
|
||||
# group.add_argument('--default-dms','-d',
|
||||
# action="store", dest="obi:defaultdms",
|
||||
# metavar='<DMS NAME>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# help="Name of the default DMS for reading and writing data.")
|
||||
#
|
||||
# group.add_argument('--input-view','-i',
|
||||
# action="store", dest="obi:inputview",
|
||||
# metavar='<INPUT VIEW NAME>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# help="Name of the input view, either raw if the view is in the default DMS,"
|
||||
# " or in the form 'dms:view' if it is in another DMS.")
|
||||
#
|
||||
# group=parser.add_argument_group('obi export specific options')
|
||||
#
|
||||
# group.add_argument('--format','-f',
|
||||
# action="store", dest="export:format",
|
||||
# metavar='<FORMAT>',
|
||||
# default="fasta",
|
||||
# type=str,
|
||||
# help="Export in the format <FORMAT>, 'fasta' or 'fastq'. Default: 'fasta'.") # TODO export in csv
|
||||
#
|
||||
# def run(config):
|
||||
#
|
||||
# # TODO import doesn't work
|
||||
# NUC_SEQUENCE_COLUMN = "NUC_SEQ"
|
||||
# ID_COLUMN = "ID"
|
||||
# DEFINITION_COLUMN = "DEFINITION"
|
||||
# QUALITY_COLUMN = "QUALITY"
|
||||
#
|
||||
# special_columns = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN]
|
||||
#
|
||||
# # Open DMS
|
||||
# d = OBIDMS(config['obi']['defaultdms'])
|
||||
#
|
||||
# # Open input view
|
||||
# iview = d.open_view(config['obi']['inputview'])
|
||||
#
|
||||
# print(iview.type)
|
||||
#
|
||||
# # TODO check that the view has the type NUC_SEQS
|
||||
# if ((config['export']['format'] == "fasta") or (config['export']['format'] == "fastq")) and (iview.type != "NUC_SEQS_VIEW") : # TODO find a way to import those macros
|
||||
# raise Exception("Error: the view to export in fasta or fastq format is not a NUC_SEQS view")
|
||||
#
|
||||
# # Initialize the progress bar
|
||||
# pb = ProgressBar(len(iview), config, seconde=5)
|
||||
#
|
||||
# i=0
|
||||
# for seq in iview :
|
||||
# pb(i)
|
||||
#
|
||||
# toprint = ">"+seq.id+" "
|
||||
#
|
||||
# for col_name in seq :
|
||||
# if col_name not in special_columns :
|
||||
# toprint = toprint + col_name + "=" + str(seq[col_name]) + "; "
|
||||
#
|
||||
# if DEFINITION_COLUMN in seq :
|
||||
# toprint = toprint + seq.definition
|
||||
#
|
||||
# nucseq = bytes2str(seq.nuc_seq)
|
||||
#
|
||||
# if config['export']['format'] == "fasta" :
|
||||
# nucseq = re.sub("(.{60})", "\\1\n", nucseq, 0, re.DOTALL)
|
||||
#
|
||||
# toprint = toprint + "\n" + nucseq
|
||||
#
|
||||
# if config['export']['format'] == "fastq" :
|
||||
# toprint = toprint + "\n" + "+" + "\n" + seq.get_str_quality()
|
||||
#
|
||||
# print(toprint)
|
||||
# i+=1
|
||||
#
|
||||
# iview.close()
|
||||
# d.close()
|
||||
#
|
||||
# print("Done.")
|
||||
#
|
||||
#
|
||||
#
|
||||
#
|
||||
#
|
||||
#
|
||||
#
|
||||
#
|
@ -1,97 +1,100 @@
|
||||
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
|
||||
from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
|
||||
#, OBIView, OBIView_line_selection # TODO cimport doesn't work
|
||||
|
||||
from functools import reduce
|
||||
import time
|
||||
|
||||
__title__="Grep view lines that match the given predicates"
|
||||
|
||||
|
||||
default_config = { 'inputview' : None,
|
||||
'outputview' : None
|
||||
}
|
||||
|
||||
def addOptions(parser):
|
||||
|
||||
# TODO put this common group somewhere else but I don't know where
|
||||
group=parser.add_argument_group('DMS and view options')
|
||||
|
||||
group.add_argument('--default-dms','-d',
|
||||
action="store", dest="obi:defaultdms",
|
||||
metavar='<DMS NAME>',
|
||||
default=None,
|
||||
type=str,
|
||||
help="Name of the default DMS for reading and writing data.")
|
||||
|
||||
group.add_argument('--input-view','-i',
|
||||
action="store", dest="obi:inputview",
|
||||
metavar='<INPUT VIEW NAME>',
|
||||
default=None,
|
||||
type=str,
|
||||
help="Name of the input view, either raw if the view is in the default DMS,"
|
||||
" or in the form 'dms:view' if it is in another DMS.")
|
||||
|
||||
group.add_argument('--output-view','-o',
|
||||
action="store", dest="obi:outputview",
|
||||
metavar='<OUTPUT VIEW NAME>',
|
||||
default=None,
|
||||
type=str,
|
||||
help="Name of the output view, either raw if the view is in the default DMS,"
|
||||
" or in the form 'dms:view' if it is in another DMS.")
|
||||
|
||||
|
||||
group=parser.add_argument_group('obi grep specific options')
|
||||
|
||||
group.add_argument('--predicate','-p',
|
||||
action="append", dest="grep:predicates",
|
||||
metavar='<PREDICATE>',
|
||||
default=None,
|
||||
type=str,
|
||||
help="Grep lines that match the given python expression on <line> or <sequence>.")
|
||||
|
||||
# from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
|
||||
# from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
|
||||
# #, OBIView, OBIView_line_selection # TODO cimport doesn't work
|
||||
#
|
||||
# from functools import reduce
|
||||
# import time
|
||||
#
|
||||
# __title__="Grep view lines that match the given predicates"
|
||||
#
|
||||
|
||||
def run(config):
|
||||
pass
|
||||
|
||||
# Open DMS
|
||||
d = OBIDMS(config['obi']['defaultdms'])
|
||||
|
||||
# Open input view 1
|
||||
# iview = d.open_view(config['obi']['inputview'])
|
||||
# default_config = { 'inputview' : None,
|
||||
# 'outputview' : None
|
||||
# }
|
||||
#
|
||||
# def addOptions(parser):
|
||||
#
|
||||
# # TODO put this common group somewhere else but I don't know where
|
||||
# group=parser.add_argument_group('DMS and view options')
|
||||
#
|
||||
# group.add_argument('--default-dms','-d',
|
||||
# action="store", dest="obi:defaultdms",
|
||||
# metavar='<DMS NAME>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# help="Name of the default DMS for reading and writing data.")
|
||||
#
|
||||
# group.add_argument('--input-view','-i',
|
||||
# action="store", dest="obi:inputview",
|
||||
# metavar='<INPUT VIEW NAME>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# help="Name of the input view, either raw if the view is in the default DMS,"
|
||||
# " or in the form 'dms:view' if it is in another DMS.")
|
||||
#
|
||||
# group.add_argument('--output-view','-o',
|
||||
# action="store", dest="obi:outputview",
|
||||
# metavar='<OUTPUT VIEW NAME>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# help="Name of the output view, either raw if the view is in the default DMS,"
|
||||
# " or in the form 'dms:view' if it is in another DMS.")
|
||||
#
|
||||
#
|
||||
# group=parser.add_argument_group('obi grep specific options')
|
||||
#
|
||||
# group.add_argument('--predicate','-p',
|
||||
# action="append", dest="grep:predicates",
|
||||
# metavar='<PREDICATE>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# help="Grep lines that match the given python expression on <line> or <sequence>.")
|
||||
#
|
||||
#
|
||||
# def run(config):
|
||||
#
|
||||
# # Open DMS
|
||||
# d = OBIDMS(config['obi']['defaultdms'])
|
||||
#
|
||||
# # Open input view 1
|
||||
# # iview = d.open_view(config['obi']['inputview'])
|
||||
# #
|
||||
# # # Initialize the progress bar
|
||||
# # pb = ProgressBar(len(iview), config, seconde=5)
|
||||
# #
|
||||
# # # Apply filter
|
||||
# # selection = OBIView_line_selection(iview)
|
||||
# # for i in range(len(iview)) :
|
||||
# # pb(i)
|
||||
# # line = iview[i]
|
||||
# #
|
||||
# # loc_env = {'sequence': line, 'line': line} # TODO add taxonomy
|
||||
# #
|
||||
# # good = (reduce(lambda bint x, bint y: x and y,
|
||||
# # (bool(eval(p, loc_env, line))
|
||||
# # for p in config['grep']['predicates']), True))
|
||||
# #
|
||||
# # if good :
|
||||
# # selection.append(i)
|
||||
# #
|
||||
# # # Create output view with the line selection
|
||||
# # oview = d.clone_view_with_line_selection(config['obi']['outputview'], selection, comments="obi grep: "+str(config['grep']['predicates'])+"\n")
|
||||
# #
|
||||
# # #print("\n")
|
||||
# # #print(repr(oview))
|
||||
# #
|
||||
# # iview.close()
|
||||
# # oview.close()
|
||||
# d.close()
|
||||
#
|
||||
#
|
||||
# # Initialize the progress bar
|
||||
# pb = ProgressBar(len(iview), config, seconde=5)
|
||||
#
|
||||
# # Apply filter
|
||||
# selection = OBIView_line_selection(iview)
|
||||
# for i in range(len(iview)) :
|
||||
# pb(i)
|
||||
# line = iview[i]
|
||||
#
|
||||
# loc_env = {'sequence': line, 'line': line} # TODO add taxonomy
|
||||
#
|
||||
# good = (reduce(lambda bint x, bint y: x and y,
|
||||
# (bool(eval(p, loc_env, line))
|
||||
# for p in config['grep']['predicates']), True))
|
||||
#
|
||||
# if good :
|
||||
# selection.append(i)
|
||||
#
|
||||
# # Create output view with the line selection
|
||||
# oview = d.clone_view_with_line_selection(config['obi']['outputview'], selection, comments="obi grep: "+str(config['grep']['predicates'])+"\n")
|
||||
#
|
||||
# #print("\n")
|
||||
# #print(repr(oview))
|
||||
#
|
||||
# iview.close()
|
||||
# oview.close()
|
||||
d.close()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -1,130 +1,133 @@
|
||||
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
|
||||
from obitools3.files.universalopener cimport uopen
|
||||
from obitools3.parsers.fasta import fastaIterator
|
||||
from obitools3.parsers.fastq import fastqIterator
|
||||
from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
|
||||
# from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
|
||||
# from obitools3.files.universalopener cimport uopen
|
||||
# from obitools3.parsers.fasta import fastaIterator
|
||||
# from obitools3.parsers.fastq import fastqIterator
|
||||
# from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
|
||||
#
|
||||
# import time
|
||||
#
|
||||
|
||||
import time
|
||||
|
||||
|
||||
__title__="Counts sequences in a sequence set"
|
||||
|
||||
|
||||
default_config = { 'destview' : None,
|
||||
'skip' : 0,
|
||||
'only' : None,
|
||||
'skiperror' : False,
|
||||
'seqinformat' : None,
|
||||
'moltype' : 'nuc',
|
||||
'filename' : None
|
||||
}
|
||||
|
||||
def addOptions(parser):
|
||||
parser.add_argument(dest='import:filename',
|
||||
metavar='<FILENAME>',
|
||||
nargs='?',
|
||||
default=None,
|
||||
help='sequence file name to be imported' )
|
||||
|
||||
group=parser.add_argument_group('obi import specific options')
|
||||
|
||||
group.add_argument('--default-dms','-d',
|
||||
action="store", dest="obi:defaultdms",
|
||||
metavar='<DMS NAME>',
|
||||
default=None,
|
||||
type=str,
|
||||
help="Name of the default DMS for reading and writing data")
|
||||
|
||||
group.add_argument('--destination-view','-v',
|
||||
action="store", dest="import:destview",
|
||||
metavar='<VIEW NAME>',
|
||||
default=None,
|
||||
type=str,
|
||||
required=True,
|
||||
help="Name of the default DMS for reading and writing data")
|
||||
|
||||
group.add_argument('--skip',
|
||||
action="store", dest="import:skip",
|
||||
metavar='<N>',
|
||||
default=None,
|
||||
type=int,
|
||||
help="skip the N first sequences")
|
||||
|
||||
group.add_argument('--only',
|
||||
action="store", dest="import:only",
|
||||
metavar='<N>',
|
||||
default=None,
|
||||
type=int,
|
||||
help="treat only N sequences")
|
||||
|
||||
group.add_argument('--skip-on-error',
|
||||
action="store_true", dest="import:skiperror",
|
||||
default=None,
|
||||
help="Skip sequence entries with parse error")
|
||||
|
||||
group.add_argument('--fasta',
|
||||
action="store_const", dest="import:seqinformat",
|
||||
default=None,
|
||||
const='fasta',
|
||||
help="Input file is in fasta nucleic format (including obitools fasta extentions)")
|
||||
|
||||
group.add_argument('--fastq',
|
||||
action="store_const", dest="import:seqinformat",
|
||||
default=None,
|
||||
const='fastq',
|
||||
help="Input file is in sanger fastq nucleic format (standard fastq)")
|
||||
|
||||
group.add_argument('--nuc',
|
||||
action="store_const", dest="import:moltype",
|
||||
default=None,
|
||||
const='nuc',
|
||||
help="Input file contains nucleic sequences")
|
||||
|
||||
group.add_argument('--prot',
|
||||
action="store_const", dest="import:moltype",
|
||||
default=None,
|
||||
const='pep',
|
||||
help="Input file contains protein sequences")
|
||||
|
||||
|
||||
|
||||
# TODO: Handling of NA values
|
||||
def run(config):
|
||||
pb = ProgressBar(35000000, config, seconde=5) # TODO should be number of records in file
|
||||
pass
|
||||
|
||||
inputs = uopen(config['import']['filename'])
|
||||
|
||||
get_quality = False
|
||||
if config['import']['seqinformat']=='fasta':
|
||||
iseq = fastaIterator(inputs)
|
||||
view_type="NUC_SEQS_VIEW"
|
||||
elif config['import']['seqinformat']=='fastq':
|
||||
iseq = fastqIterator(inputs)
|
||||
view_type="NUC_SEQS_VIEW"
|
||||
get_quality = True
|
||||
else:
|
||||
raise RuntimeError('No file format specified')
|
||||
|
||||
# Create DMS
|
||||
d = OBIDMS(config['obi']['defaultdms'])
|
||||
|
||||
# Create view
|
||||
# view = d.new_view(config['import']['destview'], view_type=view_type, quality_column=get_quality)
|
||||
# __title__="Counts sequences in a sequence set"
|
||||
#
|
||||
# i = 0
|
||||
# for seq in iseq:
|
||||
# pb(i)
|
||||
# view[i].id = seq['id']
|
||||
# view[i].definition = seq['definition']
|
||||
# view[i].nuc_seq = seq['sequence']
|
||||
# if get_quality :
|
||||
# view[i].quality = seq['quality']
|
||||
# for tag in seq['tags'] :
|
||||
# view[i][tag] = seq['tags'][tag]
|
||||
# i+=1
|
||||
#
|
||||
# #print(view.__repr__())
|
||||
# default_config = { 'destview' : None,
|
||||
# 'skip' : 0,
|
||||
# 'only' : None,
|
||||
# 'skiperror' : False,
|
||||
# 'seqinformat' : None,
|
||||
# 'moltype' : 'nuc',
|
||||
# 'filename' : None
|
||||
# }
|
||||
#
|
||||
# def addOptions(parser):
|
||||
# parser.add_argument(dest='import:filename',
|
||||
# metavar='<FILENAME>',
|
||||
# nargs='?',
|
||||
# default=None,
|
||||
# help='sequence file name to be imported' )
|
||||
#
|
||||
# group=parser.add_argument_group('obi import specific options')
|
||||
#
|
||||
# group.add_argument('--default-dms','-d',
|
||||
# action="store", dest="obi:defaultdms",
|
||||
# metavar='<DMS NAME>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# help="Name of the default DMS for reading and writing data")
|
||||
#
|
||||
# group.add_argument('--destination-view','-v',
|
||||
# action="store", dest="import:destview",
|
||||
# metavar='<VIEW NAME>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# required=True,
|
||||
# help="Name of the default DMS for reading and writing data")
|
||||
#
|
||||
# group.add_argument('--skip',
|
||||
# action="store", dest="import:skip",
|
||||
# metavar='<N>',
|
||||
# default=None,
|
||||
# type=int,
|
||||
# help="skip the N first sequences")
|
||||
#
|
||||
# group.add_argument('--only',
|
||||
# action="store", dest="import:only",
|
||||
# metavar='<N>',
|
||||
# default=None,
|
||||
# type=int,
|
||||
# help="treat only N sequences")
|
||||
#
|
||||
# group.add_argument('--skip-on-error',
|
||||
# action="store_true", dest="import:skiperror",
|
||||
# default=None,
|
||||
# help="Skip sequence entries with parse error")
|
||||
#
|
||||
# group.add_argument('--fasta',
|
||||
# action="store_const", dest="import:seqinformat",
|
||||
# default=None,
|
||||
# const='fasta',
|
||||
# help="Input file is in fasta nucleic format (including obitools fasta extentions)")
|
||||
#
|
||||
# group.add_argument('--fastq',
|
||||
# action="store_const", dest="import:seqinformat",
|
||||
# default=None,
|
||||
# const='fastq',
|
||||
# help="Input file is in sanger fastq nucleic format (standard fastq)")
|
||||
#
|
||||
# group.add_argument('--nuc',
|
||||
# action="store_const", dest="import:moltype",
|
||||
# default=None,
|
||||
# const='nuc',
|
||||
# help="Input file contains nucleic sequences")
|
||||
#
|
||||
# group.add_argument('--prot',
|
||||
# action="store_const", dest="import:moltype",
|
||||
# default=None,
|
||||
# const='pep',
|
||||
# help="Input file contains protein sequences")
|
||||
#
|
||||
#
|
||||
#
|
||||
# # TODO: Handling of NA values
|
||||
# def run(config):
|
||||
# pb = ProgressBar(35000000, config, seconde=5) # TODO should be number of records in file
|
||||
#
|
||||
# inputs = uopen(config['import']['filename'])
|
||||
#
|
||||
# get_quality = False
|
||||
# if config['import']['seqinformat']=='fasta':
|
||||
# iseq = fastaIterator(inputs)
|
||||
# view_type="NUC_SEQS_VIEW"
|
||||
# elif config['import']['seqinformat']=='fastq':
|
||||
# iseq = fastqIterator(inputs)
|
||||
# view_type="NUC_SEQS_VIEW"
|
||||
# get_quality = True
|
||||
# else:
|
||||
# raise RuntimeError('No file format specified')
|
||||
#
|
||||
# # Create DMS
|
||||
# d = OBIDMS(config['obi']['defaultdms'])
|
||||
#
|
||||
# # Create view
|
||||
# # view = d.new_view(config['import']['destview'], view_type=view_type, quality_column=get_quality)
|
||||
# #
|
||||
# # i = 0
|
||||
# # for seq in iseq:
|
||||
# # pb(i)
|
||||
# # view[i].id = seq['id']
|
||||
# # view[i].definition = seq['definition']
|
||||
# # view[i].nuc_seq = seq['sequence']
|
||||
# # if get_quality :
|
||||
# # view[i].quality = seq['quality']
|
||||
# # for tag in seq['tags'] :
|
||||
# # view[i][tag] = seq['tags'][tag]
|
||||
# # i+=1
|
||||
# #
|
||||
# # #print(view.__repr__())
|
||||
# #
|
||||
# # view.close()
|
||||
# d.close()
|
||||
#
|
||||
# view.close()
|
||||
d.close()
|
||||
|
||||
|
@ -1,211 +1,211 @@
|
||||
#cython: language_level=3
|
||||
|
||||
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
|
||||
from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
|
||||
from obitools3.utils cimport str2bytes
|
||||
|
||||
from obitools3.dms.capi.obialign cimport obi_lcs_align_one_column, \
|
||||
obi_lcs_align_two_columns
|
||||
|
||||
|
||||
import time
|
||||
|
||||
__title__="Aligns one sequence column with itself or two sequence columns"
|
||||
|
||||
|
||||
default_config = { 'inputview' : None,
|
||||
}
|
||||
|
||||
def addOptions(parser):
|
||||
|
||||
# TODO put this common group somewhere else but I don't know where.
|
||||
# Also some options should probably be in another group
|
||||
group=parser.add_argument_group('DMS and view options')
|
||||
|
||||
group.add_argument('--default-dms', '-d',
|
||||
action="store", dest="obi:defaultdms",
|
||||
metavar='<DMS NAME>',
|
||||
default=None,
|
||||
type=str,
|
||||
help="Name of the default DMS for reading and writing data.")
|
||||
|
||||
group.add_argument('--input-view-1', '-i',
|
||||
action="store", dest="obi:inputview1",
|
||||
metavar='<INPUT VIEW NAME>',
|
||||
default=None,
|
||||
type=str,
|
||||
help="Name of the (first) input view.")
|
||||
|
||||
group.add_argument('--input-view-2', '-I',
|
||||
action="store", dest="obi:inputview2",
|
||||
metavar='<INPUT VIEW NAME>',
|
||||
default="",
|
||||
type=str,
|
||||
help="Eventually, the name of the second input view.")
|
||||
|
||||
group.add_argument('--input-column-1', '-c',
|
||||
action="store", dest="obi:inputcolumn1",
|
||||
metavar='<INPUT COLUMN NAME>',
|
||||
default="",
|
||||
type=str,
|
||||
help="Name of the (first) input column. "
|
||||
" Default: the default nucleotide sequence column of the view if there is one.")
|
||||
|
||||
group.add_argument('--input-column-2', '-C',
|
||||
action="store", dest="obi:inputcolumn2",
|
||||
metavar='<INPUT COLUMN NAME>',
|
||||
default="",
|
||||
type=str,
|
||||
help="Eventually, the name of the second input column.")
|
||||
|
||||
group.add_argument('--input-elt-1', '-e',
|
||||
action="store", dest="obi:inputelement1",
|
||||
metavar='<INPUT ELEMENT NAME>',
|
||||
default="",
|
||||
type=str,
|
||||
help="If the first input column has multiple elements per line, name of the element referring to the sequence to align. "
|
||||
" Default: the first element of the line.")
|
||||
|
||||
group.add_argument('--input-elt-2', '-E',
|
||||
action="store", dest="obi:inputelement2",
|
||||
metavar='<INPUT ELEMENT NAME>',
|
||||
default="",
|
||||
type=str,
|
||||
help="If the second input column has multiple elements per line, name of the element referring to the sequence to align. "
|
||||
" Default: the first element of the line.")
|
||||
|
||||
group.add_argument('--id-column-1', '-f',
|
||||
action="store", dest="obi:idcolumn1",
|
||||
metavar='<ID COLUMN NAME>',
|
||||
default="",
|
||||
type=str,
|
||||
help="Name of the (first) column containing the identifiers of the sequences to align. "
|
||||
" Default: the default ID column of the view if there is one.")
|
||||
|
||||
group.add_argument('--id-column-2', '-F',
|
||||
action="store", dest="obi:idcolumn2",
|
||||
metavar='<ID COLUMN NAME>',
|
||||
default="",
|
||||
type=str,
|
||||
help="Eventually, the name of the second ID column.")
|
||||
|
||||
group.add_argument('--output-view', '-o',
|
||||
action="store", dest="obi:outputview",
|
||||
metavar='<OUTPUT VIEW NAME>',
|
||||
default=None,
|
||||
type=str,
|
||||
help="Name of the output view.")
|
||||
|
||||
|
||||
group=parser.add_argument_group('obi lcs specific options')
|
||||
|
||||
group.add_argument('--threshold','-t',
|
||||
action="store", dest="align:threshold",
|
||||
metavar='<THRESHOLD>',
|
||||
default=0.0,
|
||||
type=float,
|
||||
help="Score threshold. If the score is normalized and expressed in similarity (default),"
|
||||
" it is an identity, e.g. 0.95 for an identity of 95%%. If the score is normalized"
|
||||
" and expressed in distance, it is (1.0 - identity), e.g. 0.05 for an identity of 95%%."
|
||||
" If the score is not normalized and expressed in similarity, it is the length of the"
|
||||
" Longest Common Subsequence. If the score is not normalized and expressed in distance,"
|
||||
" it is (reference length - LCS length)."
|
||||
" Only sequence pairs with a similarity above <THRESHOLD> are printed. Default: 0.00"
|
||||
" (no threshold).")
|
||||
|
||||
group.add_argument('--longest-length','-L',
|
||||
action="store_const", dest="align:reflength",
|
||||
default=0,
|
||||
const=1,
|
||||
help="The reference length is the length of the longest sequence."
|
||||
" Default: the reference length is the length of the alignment.")
|
||||
|
||||
group.add_argument('--shortest-length','-l',
|
||||
action="store_const", dest="align:reflength",
|
||||
default=0,
|
||||
const=2,
|
||||
help="The reference length is the length of the shortest sequence."
|
||||
" Default: the reference length is the length of the alignment.")
|
||||
|
||||
group.add_argument('--raw','-r',
|
||||
action="store_false", dest="align:normalize",
|
||||
default=True,
|
||||
help="Raw score, not normalized. Default: score is normalized with the reference sequence length.")
|
||||
|
||||
group.add_argument('--distance','-D',
|
||||
action="store_false", dest="align:similarity",
|
||||
default=True,
|
||||
help="Score is expressed in distance. Default: score is expressed in similarity.")
|
||||
|
||||
group.add_argument('--print-seq','-s',
|
||||
action="store_true", dest="align:printseq",
|
||||
default=False,
|
||||
help="The nucleotide sequences are written in the output view. Default: they are not written.")
|
||||
|
||||
group.add_argument('--print-count','-n',
|
||||
action="store_true", dest="align:printcount",
|
||||
default=False,
|
||||
help="Sequence counts are written in the output view. Default: they are not written.")
|
||||
|
||||
group.add_argument('--thread-count','-p', # TODO should probably be in a specific option group
|
||||
action="store", dest="align:threadcount",
|
||||
metavar='<THREAD COUNT>',
|
||||
default=1,
|
||||
type=int,
|
||||
help="Number of threads to use for the computation. Default: one.")
|
||||
|
||||
|
||||
# cpdef align(str dms_n,
|
||||
# str input_view_1_n, str output_view_n,
|
||||
# str input_view_2_n="",
|
||||
# str input_column_1_n="", str input_column_2_n="",
|
||||
# str input_elt_1_n="", str input_elt_2_n="",
|
||||
# str id_column_1_n="", str id_column_2_n="",
|
||||
# double threshold=0.0, bint normalize=True,
|
||||
# int reference=0, bint similarity_mode=True,
|
||||
# bint print_seq=False, bint print_count=False,
|
||||
# comments="",
|
||||
# int thread_count=1) :
|
||||
#
|
||||
# cdef OBIDMS d
|
||||
# d = OBIDMS(dms_n)
|
||||
# from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
|
||||
# from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
|
||||
# from obitools3.utils cimport str2bytes
|
||||
#
|
||||
# if input_view_2_n == "" and input_column_2_n == "" :
|
||||
# if obi_lcs_align_one_column(d._pointer, \
|
||||
# str2bytes(input_view_1_n), \
|
||||
# str2bytes(input_column_1_n), \
|
||||
# str2bytes(input_elt_1_n), \
|
||||
# str2bytes(id_column_1_n), \
|
||||
# str2bytes(output_view_n), \
|
||||
# str2bytes(comments), \
|
||||
# print_seq, \
|
||||
# print_count, \
|
||||
# threshold, normalize, reference, similarity_mode,
|
||||
# thread_count) < 0 :
|
||||
# raise Exception("Error aligning sequences")
|
||||
# else :
|
||||
# if obi_lcs_align_two_columns(d._pointer, \
|
||||
# str2bytes(input_view_1_n), \
|
||||
# str2bytes(input_view_2_n), \
|
||||
# str2bytes(input_column_1_n), \
|
||||
# str2bytes(input_column_2_n), \
|
||||
# str2bytes(input_elt_1_n), \
|
||||
# str2bytes(input_elt_2_n), \
|
||||
# str2bytes(id_column_1_n), \
|
||||
# str2bytes(id_column_2_n), \
|
||||
# str2bytes(output_view_n), \
|
||||
# str2bytes(comments), \
|
||||
# print_seq, \
|
||||
# print_count, \
|
||||
# threshold, normalize, reference, similarity_mode) < 0 :
|
||||
# raise Exception("Error aligning sequences")
|
||||
#
|
||||
# d.close()
|
||||
# from obitools3.dms.capi.obialign cimport obi_lcs_align_one_column, \
|
||||
# obi_lcs_align_two_columns
|
||||
#
|
||||
#
|
||||
# def run(config):
|
||||
# import time
|
||||
#
|
||||
# # TODO: Build formatted comments with all parameters etc
|
||||
# __title__="Aligns one sequence column with itself or two sequence columns"
|
||||
#
|
||||
#
|
||||
# default_config = { 'inputview' : None,
|
||||
# }
|
||||
#
|
||||
# def addOptions(parser):
|
||||
#
|
||||
# # TODO put this common group somewhere else but I don't know where.
|
||||
# # Also some options should probably be in another group
|
||||
# group=parser.add_argument_group('DMS and view options')
|
||||
#
|
||||
# group.add_argument('--default-dms', '-d',
|
||||
# action="store", dest="obi:defaultdms",
|
||||
# metavar='<DMS NAME>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# help="Name of the default DMS for reading and writing data.")
|
||||
#
|
||||
# group.add_argument('--input-view-1', '-i',
|
||||
# action="store", dest="obi:inputview1",
|
||||
# metavar='<INPUT VIEW NAME>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# help="Name of the (first) input view.")
|
||||
#
|
||||
# group.add_argument('--input-view-2', '-I',
|
||||
# action="store", dest="obi:inputview2",
|
||||
# metavar='<INPUT VIEW NAME>',
|
||||
# default="",
|
||||
# type=str,
|
||||
# help="Eventually, the name of the second input view.")
|
||||
#
|
||||
# group.add_argument('--input-column-1', '-c',
|
||||
# action="store", dest="obi:inputcolumn1",
|
||||
# metavar='<INPUT COLUMN NAME>',
|
||||
# default="",
|
||||
# type=str,
|
||||
# help="Name of the (first) input column. "
|
||||
# " Default: the default nucleotide sequence column of the view if there is one.")
|
||||
#
|
||||
# group.add_argument('--input-column-2', '-C',
|
||||
# action="store", dest="obi:inputcolumn2",
|
||||
# metavar='<INPUT COLUMN NAME>',
|
||||
# default="",
|
||||
# type=str,
|
||||
# help="Eventually, the name of the second input column.")
|
||||
#
|
||||
# group.add_argument('--input-elt-1', '-e',
|
||||
# action="store", dest="obi:inputelement1",
|
||||
# metavar='<INPUT ELEMENT NAME>',
|
||||
# default="",
|
||||
# type=str,
|
||||
# help="If the first input column has multiple elements per line, name of the element referring to the sequence to align. "
|
||||
# " Default: the first element of the line.")
|
||||
#
|
||||
# group.add_argument('--input-elt-2', '-E',
|
||||
# action="store", dest="obi:inputelement2",
|
||||
# metavar='<INPUT ELEMENT NAME>',
|
||||
# default="",
|
||||
# type=str,
|
||||
# help="If the second input column has multiple elements per line, name of the element referring to the sequence to align. "
|
||||
# " Default: the first element of the line.")
|
||||
#
|
||||
# group.add_argument('--id-column-1', '-f',
|
||||
# action="store", dest="obi:idcolumn1",
|
||||
# metavar='<ID COLUMN NAME>',
|
||||
# default="",
|
||||
# type=str,
|
||||
# help="Name of the (first) column containing the identifiers of the sequences to align. "
|
||||
# " Default: the default ID column of the view if there is one.")
|
||||
#
|
||||
# group.add_argument('--id-column-2', '-F',
|
||||
# action="store", dest="obi:idcolumn2",
|
||||
# metavar='<ID COLUMN NAME>',
|
||||
# default="",
|
||||
# type=str,
|
||||
# help="Eventually, the name of the second ID column.")
|
||||
#
|
||||
# group.add_argument('--output-view', '-o',
|
||||
# action="store", dest="obi:outputview",
|
||||
# metavar='<OUTPUT VIEW NAME>',
|
||||
# default=None,
|
||||
# type=str,
|
||||
# help="Name of the output view.")
|
||||
#
|
||||
#
|
||||
# group=parser.add_argument_group('obi lcs specific options')
|
||||
#
|
||||
# group.add_argument('--threshold','-t',
|
||||
# action="store", dest="align:threshold",
|
||||
# metavar='<THRESHOLD>',
|
||||
# default=0.0,
|
||||
# type=float,
|
||||
# help="Score threshold. If the score is normalized and expressed in similarity (default),"
|
||||
# " it is an identity, e.g. 0.95 for an identity of 95%%. If the score is normalized"
|
||||
# " and expressed in distance, it is (1.0 - identity), e.g. 0.05 for an identity of 95%%."
|
||||
# " If the score is not normalized and expressed in similarity, it is the length of the"
|
||||
# " Longest Common Subsequence. If the score is not normalized and expressed in distance,"
|
||||
# " it is (reference length - LCS length)."
|
||||
# " Only sequence pairs with a similarity above <THRESHOLD> are printed. Default: 0.00"
|
||||
# " (no threshold).")
|
||||
#
|
||||
# group.add_argument('--longest-length','-L',
|
||||
# action="store_const", dest="align:reflength",
|
||||
# default=0,
|
||||
# const=1,
|
||||
# help="The reference length is the length of the longest sequence."
|
||||
# " Default: the reference length is the length of the alignment.")
|
||||
#
|
||||
# group.add_argument('--shortest-length','-l',
|
||||
# action="store_const", dest="align:reflength",
|
||||
# default=0,
|
||||
# const=2,
|
||||
# help="The reference length is the length of the shortest sequence."
|
||||
# " Default: the reference length is the length of the alignment.")
|
||||
#
|
||||
# group.add_argument('--raw','-r',
|
||||
# action="store_false", dest="align:normalize",
|
||||
# default=True,
|
||||
# help="Raw score, not normalized. Default: score is normalized with the reference sequence length.")
|
||||
#
|
||||
# group.add_argument('--distance','-D',
|
||||
# action="store_false", dest="align:similarity",
|
||||
# default=True,
|
||||
# help="Score is expressed in distance. Default: score is expressed in similarity.")
|
||||
#
|
||||
# group.add_argument('--print-seq','-s',
|
||||
# action="store_true", dest="align:printseq",
|
||||
# default=False,
|
||||
# help="The nucleotide sequences are written in the output view. Default: they are not written.")
|
||||
#
|
||||
# group.add_argument('--print-count','-n',
|
||||
# action="store_true", dest="align:printcount",
|
||||
# default=False,
|
||||
# help="Sequence counts are written in the output view. Default: they are not written.")
|
||||
#
|
||||
# group.add_argument('--thread-count','-p', # TODO should probably be in a specific option group
|
||||
# action="store", dest="align:threadcount",
|
||||
# metavar='<THREAD COUNT>',
|
||||
# default=1,
|
||||
# type=int,
|
||||
# help="Number of threads to use for the computation. Default: one.")
|
||||
#
|
||||
#
|
||||
# # cpdef align(str dms_n,
|
||||
# # str input_view_1_n, str output_view_n,
|
||||
# # str input_view_2_n="",
|
||||
# # str input_column_1_n="", str input_column_2_n="",
|
||||
# # str input_elt_1_n="", str input_elt_2_n="",
|
||||
# # str id_column_1_n="", str id_column_2_n="",
|
||||
# # double threshold=0.0, bint normalize=True,
|
||||
# # int reference=0, bint similarity_mode=True,
|
||||
# # bint print_seq=False, bint print_count=False,
|
||||
# # comments="",
|
||||
# # int thread_count=1) :
|
||||
# #
|
||||
# # cdef OBIDMS d
|
||||
# # d = OBIDMS(dms_n)
|
||||
# #
|
||||
# # if input_view_2_n == "" and input_column_2_n == "" :
|
||||
# # if obi_lcs_align_one_column(d._pointer, \
|
||||
# # str2bytes(input_view_1_n), \
|
||||
# # str2bytes(input_column_1_n), \
|
||||
# # str2bytes(input_elt_1_n), \
|
||||
# # str2bytes(id_column_1_n), \
|
||||
# # str2bytes(output_view_n), \
|
||||
# # str2bytes(comments), \
|
||||
# # print_seq, \
|
||||
# # print_count, \
|
||||
# # threshold, normalize, reference, similarity_mode,
|
||||
# # thread_count) < 0 :
|
||||
# # raise Exception("Error aligning sequences")
|
||||
# # else :
|
||||
# # if obi_lcs_align_two_columns(d._pointer, \
|
||||
# # str2bytes(input_view_1_n), \
|
||||
# # str2bytes(input_view_2_n), \
|
||||
# # str2bytes(input_column_1_n), \
|
||||
# # str2bytes(input_column_2_n), \
|
||||
# # str2bytes(input_elt_1_n), \
|
||||
# # str2bytes(input_elt_2_n), \
|
||||
# # str2bytes(id_column_1_n), \
|
||||
# # str2bytes(id_column_2_n), \
|
||||
# # str2bytes(output_view_n), \
|
||||
# # str2bytes(comments), \
|
||||
# # print_seq, \
|
||||
# # print_count, \
|
||||
# # threshold, normalize, reference, similarity_mode) < 0 :
|
||||
# # raise Exception("Error aligning sequences")
|
||||
# #
|
||||
# # d.close()
|
||||
# #
|
||||
# #
|
||||
def run(config):
|
||||
pass
|
||||
# TODO: Build formatted comments with all parameters etc
|
||||
# comments = "Obi align"
|
||||
#
|
||||
# # Call cython alignment function
|
||||
@ -229,8 +229,8 @@ def addOptions(parser):
|
||||
# thread_count = config['align']['threadcount'])
|
||||
#
|
||||
# print("Done.")
|
||||
#
|
||||
#
|
||||
#
|
||||
#
|
||||
#
|
||||
# #
|
||||
# #
|
||||
# #
|
||||
# #
|
||||
# #
|
@ -1,383 +1,393 @@
|
||||
from obitools3.apps.progress cimport ProgressBar # TODO I absolutely don't understand why it doesn't work without that line
|
||||
from obitools3.dms.dms import OBIDMS # TODO cimport doesn't work
|
||||
# from obitools3.obidms._obidms import OBIView_line_selection
|
||||
# from obitools3.utils cimport str2bytes
|
||||
#
|
||||
# import shutil
|
||||
# import string
|
||||
# import random
|
||||
#
|
||||
#
|
||||
# VIEW_TYPES = ["", "NUC_SEQS_VIEW"]
|
||||
from obitools3.dms.view.view import View, Line_selection
|
||||
from obitools3.dms.dms import DMS
|
||||
from obitools3.dms.column import Column
|
||||
from obitools3.utils cimport str2bytes
|
||||
|
||||
import shutil
|
||||
import string
|
||||
import random
|
||||
|
||||
|
||||
VIEW_TYPES = [""] #, "NUC_SEQS_VIEW"]
|
||||
#COL_TYPES = ["OBI_BOOL", "OBI_CHAR", "OBI_FLOAT", "OBI_INT", "OBI_SEQ", "OBI_STR"]
|
||||
# NUC_SEQUENCE_COLUMN = "NUC_SEQ"
|
||||
# ID_COLUMN = "ID"
|
||||
# DEFINITION_COLUMN = "DEFINITION"
|
||||
# QUALITY_COLUMN = "QUALITY"
|
||||
# SPECIAL_COLUMNS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN]
|
||||
#
|
||||
#
|
||||
# NAME_MAX_LEN = 200
|
||||
# COL_COMMENTS_MAX_LEN = 2048
|
||||
# MAX_INT = 2147483647 # used to generate random float values
|
||||
#
|
||||
#
|
||||
# __title__="Tests if the obitools are working properly"
|
||||
#
|
||||
#
|
||||
# default_config = {
|
||||
# }
|
||||
#
|
||||
#
|
||||
# def random_length(max_len):
|
||||
# return random.randint(1, max_len)
|
||||
#
|
||||
#
|
||||
# def random_bool(config):
|
||||
# return random.choice([True, False])
|
||||
#
|
||||
#
|
||||
# def random_char(config):
|
||||
# return random.choice(string.ascii_lowercase)
|
||||
#
|
||||
#
|
||||
# def random_float(config):
|
||||
# return random.randint(0, MAX_INT) + random.random()
|
||||
#
|
||||
#
|
||||
# def random_int(config):
|
||||
# return random.randint(0, config['test']['maxlinenb'])
|
||||
#
|
||||
#
|
||||
# def random_seq(config):
|
||||
# return str2bytes(''.join(random.choice(['a','t','g','c']) for i in range(random_length(config['test']['seqmaxlen']))))
|
||||
#
|
||||
#
|
||||
# def random_str(config):
|
||||
# return random_str_with_max_len(config['test']['strmaxlen'])
|
||||
#
|
||||
#
|
||||
# def random_str_with_max_len(max_len):
|
||||
# return ''.join(random.choice(string.ascii_lowercase) for i in range(random_length(max_len)))
|
||||
#
|
||||
#
|
||||
# def random_column(infos):
|
||||
# return random.choice(sorted(list(infos['view'].columns)))
|
||||
#
|
||||
#
|
||||
# def random_unique_name(infos):
|
||||
# name = ""
|
||||
# while name == "" or name in infos['unique_names'] :
|
||||
# name = random_str_with_max_len(NAME_MAX_LEN)
|
||||
# infos['unique_names'].append(name)
|
||||
# return name
|
||||
#
|
||||
#
|
||||
# def random_unique_element_name(config, infos):
|
||||
# name = ""
|
||||
# while name == "" or name in infos['unique_names'] :
|
||||
# name = random_str_with_max_len(config['test']['elt_name_max_len'])
|
||||
# infos['unique_names'].append(name)
|
||||
# return name
|
||||
#
|
||||
#
|
||||
# def print_test(config, sentence):
|
||||
# if config['test']['verbose'] :
|
||||
# print(sentence)
|
||||
#
|
||||
#
|
||||
# def test_set_and_get(config, infos):
|
||||
# print_test(config, ">>> Set and get test")
|
||||
# col_name = random_column(infos)
|
||||
# col = infos['view'][col_name]
|
||||
# element_names = col.elements_names
|
||||
# data_type = col.data_type
|
||||
# if data_type == "OBI_QUAL" :
|
||||
# print_test(config, "-")
|
||||
# return
|
||||
# idx = random_int(config)
|
||||
# value = infos['random_generator'][data_type](config)
|
||||
# if col.nb_elements_per_line > 1 :
|
||||
# elt = random.choice(element_names)
|
||||
# col[idx][elt] = value
|
||||
# assert col[idx][elt] == value, "Set value != gotten value "+str(col[idx][elt])+" != "+str(value)
|
||||
# else:
|
||||
# col[idx] = value
|
||||
# assert col[idx] == value, "Set value != gotten value "+str(col[idx])+" != "+str(value)
|
||||
#
|
||||
# print_test(config, ">>> Set and get test OK")
|
||||
#
|
||||
#
|
||||
# def test_add_col(config, infos):
|
||||
# print_test(config, ">>> Add column test")
|
||||
# #existing_col = random_bool(config) # TODO doesn't work because of line count problem. See obiview.c line 1737
|
||||
# #if existing_col and infos["view_names"] != [] :
|
||||
# # random_view = infos['dms'].open_view(random.choice(infos["view_names"]))
|
||||
# # random_column = random_view[random.choice(sorted(list(random_view.columns))]
|
||||
# # random_column_refs = random_column.refs
|
||||
# # if random_column_refs['name'] in infos['view'] :
|
||||
# # alias = random_unique_name(infos)
|
||||
# # else :
|
||||
# # alias = ''
|
||||
# # infos['view'].add_column(random_column_refs['name'], version_number=random_column_refs['version'], alias=alias, create=False)
|
||||
# # random_view.close()
|
||||
# #else :
|
||||
# create_random_column(config, infos)
|
||||
# print_test(config, ">>> Add column test OK")
|
||||
#
|
||||
#
|
||||
# def test_delete_col(config, infos):
|
||||
# print_test(config, ">>> Delete column test")
|
||||
# if len(list(infos['view'].columns)) <= 1 :
|
||||
# print_test(config, "-")
|
||||
# return
|
||||
# col_name = random_column(infos)
|
||||
# if col_name in SPECIAL_COLUMNS :
|
||||
# print_test(config, "-")
|
||||
# return
|
||||
# infos['view'].delete_column(col_name)
|
||||
# print_test(config, ">>> Delete column test OK")
|
||||
#
|
||||
#
|
||||
# def test_col_alias(config, infos):
|
||||
# print_test(config, ">>> Changing column alias test")
|
||||
# col_name = random_column(infos)
|
||||
# if col_name in SPECIAL_COLUMNS :
|
||||
# print_test(config, "-")
|
||||
# return
|
||||
# infos['view'][col_name].alias = random_unique_name(infos)
|
||||
# print_test(config, ">>> Changing column alias test OK")
|
||||
#
|
||||
#
|
||||
# def test_new_view(config, infos):
|
||||
# print_test(config, ">>> New view test")
|
||||
# random_new_view(config, infos)
|
||||
# print_test(config, ">>> New view test OK")
|
||||
#
|
||||
#
|
||||
# def random_test(config, infos):
|
||||
# return random.choice(infos['tests'])(config, infos)
|
||||
#
|
||||
#
|
||||
# def random_view_type():
|
||||
# return random.choice(VIEW_TYPES)
|
||||
#
|
||||
#
|
||||
# def random_col_type():
|
||||
# return random.choice(COL_TYPES)
|
||||
#
|
||||
#
|
||||
# def fill_column(config, infos, col) :
|
||||
# data_type = col.data_type
|
||||
# element_names = col.elements_names
|
||||
#
|
||||
# if len(element_names) > 1 :
|
||||
# for i in range(random_int(config)) :
|
||||
# for j in range(len(element_names)) :
|
||||
# col[i][element_names[j]] = infos['random_generator'][data_type](config)
|
||||
# else :
|
||||
# for i in range(random_int(config)) :
|
||||
# col[i] = infos['random_generator'][data_type](config)
|
||||
#
|
||||
#
|
||||
# def create_random_column(config, infos) :
|
||||
# alias = random.choice(['', random_unique_name(infos)])
|
||||
# nb_elements_per_line=random.randint(1, config['test']['maxelts'])
|
||||
# elements_names = []
|
||||
# for i in range(nb_elements_per_line) :
|
||||
# elements_names.append(random_unique_element_name(config, infos))
|
||||
# elements_names = random.choice([None, elements_names])
|
||||
# name = random_unique_name(infos)
|
||||
# infos['view'].add_column(name,
|
||||
# alias=alias,
|
||||
# type=random_col_type(),
|
||||
# nb_elements_per_line=nb_elements_per_line,
|
||||
# elements_names=elements_names,
|
||||
# indexer_name=random.choice(['', random_unique_name(infos)]),
|
||||
# comments=random_str_with_max_len(COL_COMMENTS_MAX_LEN),
|
||||
# create=True
|
||||
# )
|
||||
# if alias != '' :
|
||||
# return infos['view'][alias]
|
||||
# else :
|
||||
# return infos['view'][name]
|
||||
#
|
||||
#
|
||||
# def fill_view(config, infos):
|
||||
# for i in range(random.randint(1, config['test']['maxinicolcount'])) :
|
||||
# col = create_random_column(config, infos)
|
||||
# fill_column(config, infos, col)
|
||||
#
|
||||
#
|
||||
# def random_new_view(config, infos, first=False):
|
||||
# v_to_clone = None
|
||||
# line_selection = None
|
||||
# quality_col = False # TODO
|
||||
# if not first:
|
||||
# infos['view_names'].append(infos['view'].name)
|
||||
# infos['view'].close()
|
||||
# v_to_clone = infos['dms'].open_view(random.choice(infos["view_names"]))
|
||||
# v_type = ""
|
||||
# print_test(config, "View to clone: ")
|
||||
# print_test(config, repr(v_to_clone))
|
||||
# create_line_selection = random_bool(config)
|
||||
# if create_line_selection and v_to_clone.line_count > 0:
|
||||
# print_test(config, "New view with new line selection.")
|
||||
# line_selection = OBIView_line_selection(v_to_clone)
|
||||
# for i in range(random.randint(1, v_to_clone.line_count)) :
|
||||
# line_selection.append(random.randint(0, v_to_clone.line_count-1))
|
||||
# #print_test(config, "New line selection: "+str(line_selection))
|
||||
# else :
|
||||
# v_type = random_view_type()
|
||||
#
|
||||
# if line_selection is not None :
|
||||
# infos['view'] = infos['dms'].clone_view_with_line_selection(random_unique_name(infos), line_selection, comments=random_str_with_max_len(config['test']['commentsmaxlen']))
|
||||
# elif v_to_clone is not None :
|
||||
# infos['view'] = infos['dms'].clone_view(random_unique_name(infos), v_to_clone, comments=random_str_with_max_len(config['test']['commentsmaxlen']))
|
||||
# else :
|
||||
# infos['view'] = infos['dms'].new_view(random_unique_name(infos), view_type=v_type, comments=random_str_with_max_len(config['test']['commentsmaxlen']), quality_column=quality_col)
|
||||
#
|
||||
# print_test(config, repr(infos['view']))
|
||||
# if v_to_clone is not None :
|
||||
# if line_selection is None:
|
||||
# assert v_to_clone.line_count == infos['view'].line_count, "New view and cloned view don't have the same line count"
|
||||
# else :
|
||||
# assert len(line_selection) == infos['view'].line_count, "New view with new line selection does not have the right line count"
|
||||
# v_to_clone.close()
|
||||
# if first :
|
||||
# fill_view(config, infos)
|
||||
#
|
||||
#
|
||||
# def create_test_obidms(config, infos):
|
||||
# infos['dms'] = OBIDMS(config['obi']['defaultdms'])
|
||||
#
|
||||
#
|
||||
# def ini_dms_and_first_view(config, infos):
|
||||
# create_test_obidms(config, infos)
|
||||
# random_new_view(config, infos, first=True)
|
||||
# infos['view_names'] = []
|
||||
#
|
||||
#
|
||||
# def addOptions(parser):
|
||||
#
|
||||
# # TODO put this common group somewhere else but I don't know where
|
||||
# group=parser.add_argument_group('DMS and view options')
|
||||
#
|
||||
# group.add_argument('--default-dms','-d',
|
||||
# action="store", dest="obi:defaultdms",
|
||||
# metavar='<DMS NAME>',
|
||||
# default="/tmp/test_dms",
|
||||
# type=str,
|
||||
# help="Name of the default DMS for reading and writing data. "
|
||||
# "Default: /tmp/test_dms")
|
||||
#
|
||||
# group=parser.add_argument_group('obi test specific options')
|
||||
#
|
||||
# group.add_argument('--nb_tests','-n',
|
||||
# action="store", dest="test:nbtests",
|
||||
# metavar='<NB_TESTS>',
|
||||
# default=1000,
|
||||
# type=int,
|
||||
# help="Number of tests to carry out. "
|
||||
# "Default: 1000")
|
||||
#
|
||||
# group.add_argument('--seq_max_len','-s',
|
||||
# action="store", dest="test:seqmaxlen",
|
||||
# metavar='<SEQ_MAX_LEN>',
|
||||
# default=200,
|
||||
# type=int,
|
||||
# help="Maximum length of DNA sequences. "
|
||||
# "Default: 200")
|
||||
#
|
||||
# group.add_argument('--str_max_len','-t',
|
||||
# action="store", dest="test:strmaxlen",
|
||||
# metavar='<STR_MAX_LEN>',
|
||||
# default=200,
|
||||
# type=int,
|
||||
# help="Maximum length of character strings. "
|
||||
# "Default: 200")
|
||||
#
|
||||
# group.add_argument('--comments_max_len','-c',
|
||||
# action="store", dest="test:commentsmaxlen",
|
||||
# metavar='<COMMENTS_MAX_LEN>',
|
||||
# default=10000,
|
||||
# type=int,
|
||||
# help="Maximum length of view comments. "
|
||||
# "Default: 10000")
|
||||
#
|
||||
# group.add_argument('--max_ini_col_count','-o',
|
||||
# action="store", dest="test:maxinicolcount",
|
||||
# metavar='<MAX_INI_COL_COUNT>',
|
||||
# default=10,
|
||||
# type=int,
|
||||
# help="Maximum number of columns in the initial view. "
|
||||
# "Default: 10")
|
||||
#
|
||||
# group.add_argument('--max_line_nb','-l',
|
||||
# action="store", dest="test:maxlinenb",
|
||||
# metavar='<MAX_LINE_NB>',
|
||||
# default=10000,
|
||||
# type=int,
|
||||
# help="Maximum number of lines in a column. "
|
||||
# "Default: 10000")
|
||||
#
|
||||
# group.add_argument('--max_elts_per_line','-e',
|
||||
# action="store", dest="test:maxelts",
|
||||
# metavar='<MAX_ELTS_PER_LINE>',
|
||||
# default=20,
|
||||
# type=int,
|
||||
# help="Maximum number of elements per line in a column. "
|
||||
# "Default: 20")
|
||||
#
|
||||
# group.add_argument('--verbose','-v',
|
||||
# action="store_true", dest="test:verbose",
|
||||
# default=False,
|
||||
# help="Print the tests. "
|
||||
# "Default: Don't print the tests")
|
||||
#
|
||||
# group.add_argument('--seed','-g',
|
||||
# action="store", dest="test:seed",
|
||||
# metavar='<SEED>',
|
||||
# default=None,
|
||||
# help="Seed (use for reproducible tests). "
|
||||
# "Default: Seed is determined by Python")
|
||||
#
|
||||
# def run(config):
|
||||
#
|
||||
# if 'seed' in config['test'] :
|
||||
# random.seed(config['test']['seed'])
|
||||
#
|
||||
# infos = {'dms': None,
|
||||
# 'view': None,
|
||||
# 'view_names': None,
|
||||
# 'unique_names': [],
|
||||
# 'random_generator': {"OBI_BOOL": random_bool, "OBI_CHAR": random_char, "OBI_FLOAT": random_float, "OBI_INT": random_int, "OBI_SEQ": random_seq, "OBI_STR": random_str},
|
||||
# 'tests': [test_set_and_get, test_add_col, test_delete_col, test_col_alias, test_new_view]
|
||||
# }
|
||||
#
|
||||
# config['test']['elt_name_max_len'] = int((COL_COMMENTS_MAX_LEN - config['test']['maxelts']) / config['test']['maxelts'])
|
||||
#
|
||||
# print("Initializing the DMS and the first view...")
|
||||
#
|
||||
# shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True)
|
||||
#
|
||||
# ini_dms_and_first_view(config, infos)
|
||||
# print_test(config, repr(infos['view']))
|
||||
#
|
||||
# i = 0
|
||||
# for t in range(config['test']['nbtests']):
|
||||
# random_test(config, infos)
|
||||
# print_test(config, repr(infos['view']))
|
||||
# i+=1
|
||||
# if (i%(config['test']['nbtests']/10)) == 0 :
|
||||
# print("Testing......"+str(i*100/config['test']['nbtests'])+"%")
|
||||
#
|
||||
# #print(infos)
|
||||
#
|
||||
# infos['view'].close()
|
||||
# infos['dms'].close()
|
||||
# shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True)
|
||||
#
|
||||
# print("Done.")
|
||||
COL_TYPES = [1, 2, 3, 4, 6, 7]
|
||||
NUC_SEQUENCE_COLUMN = "NUC_SEQ"
|
||||
ID_COLUMN = "ID"
|
||||
DEFINITION_COLUMN = "DEFINITION"
|
||||
QUALITY_COLUMN = "QUALITY"
|
||||
SPECIAL_COLUMNS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN]
|
||||
|
||||
|
||||
NAME_MAX_LEN = 200
|
||||
COL_COMMENTS_MAX_LEN = 2048
|
||||
MAX_INT = 2147483647 # used to generate random float values
|
||||
|
||||
|
||||
__title__="Tests if the obitools are working properly"
|
||||
|
||||
|
||||
default_config = {
|
||||
}
|
||||
|
||||
|
||||
def random_length(max_len):
|
||||
return random.randint(1, max_len)
|
||||
|
||||
|
||||
def random_bool(config):
|
||||
return random.choice([True, False])
|
||||
|
||||
|
||||
def random_char(config):
|
||||
return str2bytes(random.choice(string.ascii_lowercase))
|
||||
|
||||
|
||||
def random_float(config):
|
||||
return random.randint(0, MAX_INT) + random.random()
|
||||
|
||||
|
||||
def random_int(config):
|
||||
return random.randint(0, config['test']['maxlinenb'])
|
||||
|
||||
|
||||
def random_seq(config):
|
||||
return str2bytes(''.join(random.choice(['a','t','g','c']) for i in range(random_length(config['test']['seqmaxlen']))))
|
||||
|
||||
|
||||
def random_bytes(config):
|
||||
return random_bytes_with_max_len(config['test']['strmaxlen'])
|
||||
|
||||
|
||||
def random_str_with_max_len(max_len):
|
||||
return ''.join(random.choice(string.ascii_lowercase) for i in range(random_length(max_len)))
|
||||
|
||||
|
||||
def random_bytes_with_max_len(max_len):
|
||||
return str2bytes(''.join(random.choice(string.ascii_lowercase) for i in range(random_length(max_len))))
|
||||
|
||||
|
||||
def random_column(infos):
|
||||
return random.choice(sorted(list(infos['view'].keys())))
|
||||
|
||||
|
||||
def random_unique_name(infos):
|
||||
name = ""
|
||||
while name == "" or name in infos['unique_names'] :
|
||||
name = random_str_with_max_len(NAME_MAX_LEN)
|
||||
infos['unique_names'].append(name)
|
||||
return name
|
||||
|
||||
|
||||
def random_unique_element_name(config, infos):
|
||||
name = ""
|
||||
while name == "" or name in infos['unique_names'] :
|
||||
name = random_str_with_max_len(config['test']['elt_name_max_len'])
|
||||
infos['unique_names'].append(name)
|
||||
return name
|
||||
|
||||
|
||||
def print_test(config, sentence):
|
||||
if config['test']['verbose'] :
|
||||
print(sentence)
|
||||
|
||||
|
||||
def test_set_and_get(config, infos):
|
||||
print_test(config, ">>> Set and get test")
|
||||
col_name = random_column(infos)
|
||||
col = infos['view'][col_name]
|
||||
element_names = col.elements_names
|
||||
data_type = col.data_type
|
||||
if data_type == "OBI_QUAL" :
|
||||
print_test(config, "-")
|
||||
return
|
||||
idx = random_int(config)
|
||||
value = random.choice([None, infos['random_generator'][data_type](config)])
|
||||
if col.nb_elements_per_line > 1 :
|
||||
elt = random.choice(element_names)
|
||||
col[idx][elt] = value
|
||||
assert col[idx][elt] == value, "Column: "+repr(col)+"\nSet value != gotten value "+str(value)+" != "+str(col[idx][elt])
|
||||
else:
|
||||
col[idx] = value
|
||||
assert col[idx] == value, "Column: "+repr(col)+"\nSet value != gotten value "+str(value)+" != "+str(col[idx])
|
||||
|
||||
print_test(config, ">>> Set and get test OK")
|
||||
|
||||
|
||||
def test_add_col(config, infos):
|
||||
print_test(config, ">>> Add column test")
|
||||
#existing_col = random_bool(config) # TODO doesn't work because of line count problem. See obiview.c line 1737
|
||||
#if existing_col and infos["view_names"] != [] :
|
||||
# random_view = infos['dms'].open_view(random.choice(infos["view_names"]))
|
||||
# random_column = random_view[random.choice(sorted(list(random_view.columns))]
|
||||
# random_column_refs = random_column.refs
|
||||
# if random_column_refs['name'] in infos['view'] :
|
||||
# alias = random_unique_name(infos)
|
||||
# else :
|
||||
# alias = ''
|
||||
# infos['view'].add_column(random_column_refs['name'], version_number=random_column_refs['version'], alias=alias, create=False)
|
||||
# random_view.close()
|
||||
#else :
|
||||
create_random_column(config, infos)
|
||||
print_test(config, ">>> Add column test OK")
|
||||
|
||||
|
||||
def test_delete_col(config, infos):
|
||||
print_test(config, ">>> Delete column test")
|
||||
if len(list(infos['view'].keys())) <= 1 :
|
||||
print_test(config, "-")
|
||||
return
|
||||
col_name = random_column(infos)
|
||||
if col_name in SPECIAL_COLUMNS :
|
||||
print_test(config, "-")
|
||||
return
|
||||
infos['view'].delete_column(col_name)
|
||||
print_test(config, ">>> Delete column test OK")
|
||||
|
||||
|
||||
def test_col_alias(config, infos):
|
||||
print_test(config, ">>> Changing column alias test")
|
||||
col_name = random_column(infos)
|
||||
if col_name in SPECIAL_COLUMNS :
|
||||
print_test(config, "-")
|
||||
return
|
||||
infos['view'][col_name].name = random_unique_name(infos)
|
||||
print_test(config, ">>> Changing column alias test OK")
|
||||
|
||||
|
||||
def test_new_view(config, infos):
|
||||
print_test(config, ">>> New view test")
|
||||
random_new_view(config, infos)
|
||||
print_test(config, ">>> New view test OK")
|
||||
|
||||
|
||||
def random_test(config, infos):
|
||||
return random.choice(infos['tests'])(config, infos)
|
||||
|
||||
|
||||
def random_view_type():
|
||||
return random.choice(VIEW_TYPES)
|
||||
|
||||
|
||||
def random_col_type():
|
||||
return random.choice(COL_TYPES)
|
||||
|
||||
|
||||
def fill_column(config, infos, col) :
|
||||
data_type = col.data_type
|
||||
element_names = col.elements_names
|
||||
|
||||
if len(element_names) > 1 :
|
||||
for i in range(random_int(config)) :
|
||||
for j in range(len(element_names)) :
|
||||
col[i][element_names[j]] = random.choice([None, infos['random_generator'][data_type](config)])
|
||||
else :
|
||||
for i in range(random_int(config)) :
|
||||
col[i] = random.choice([None, infos['random_generator'][data_type](config)])
|
||||
|
||||
|
||||
def create_random_column(config, infos) :
|
||||
alias = random.choice(['', random_unique_name(infos)])
|
||||
nb_elements_per_line=random.randint(1, config['test']['maxelts'])
|
||||
elements_names = []
|
||||
for i in range(nb_elements_per_line) :
|
||||
elements_names.append(random_unique_element_name(config, infos))
|
||||
elements_names = random.choice([None, elements_names])
|
||||
name = random_unique_name(infos)
|
||||
data_type = random_col_type()
|
||||
|
||||
column = Column.new_column(infos['view'],
|
||||
name,
|
||||
data_type,
|
||||
nb_elements_per_line=nb_elements_per_line,
|
||||
elements_names=elements_names,
|
||||
comments=random_str_with_max_len(COL_COMMENTS_MAX_LEN),
|
||||
alias=alias
|
||||
)
|
||||
|
||||
if alias != '' :
|
||||
assert infos['view'][alias] == column
|
||||
else :
|
||||
assert infos['view'][name] == column
|
||||
|
||||
return column
|
||||
|
||||
|
||||
def fill_view(config, infos):
|
||||
for i in range(random.randint(1, config['test']['maxinicolcount'])) :
|
||||
col = create_random_column(config, infos)
|
||||
fill_column(config, infos, col)
|
||||
|
||||
|
||||
def random_new_view(config, infos, first=False):
|
||||
v_to_clone = None
|
||||
line_selection = None
|
||||
quality_col = False # TODO
|
||||
if not first:
|
||||
infos['view_names'].append(infos['view'].name)
|
||||
infos['view'].close()
|
||||
v_to_clone = View.open(infos['dms'], random.choice(infos["view_names"]))
|
||||
v_type = ""
|
||||
print_test(config, "View to clone: ")
|
||||
print_test(config, repr(v_to_clone))
|
||||
create_line_selection = random_bool(config)
|
||||
if create_line_selection and v_to_clone.line_count > 0:
|
||||
print_test(config, "New view with new line selection.")
|
||||
line_selection = Line_selection(v_to_clone)
|
||||
for i in range(random.randint(1, v_to_clone.line_count)) :
|
||||
line_selection.append(random.randint(0, v_to_clone.line_count-1))
|
||||
#print_test(config, "New line selection: "+str(line_selection))
|
||||
else :
|
||||
v_type = random_view_type()
|
||||
|
||||
if line_selection is not None :
|
||||
infos['view'] = line_selection.materialize(random_unique_name(infos), comments=random_str_with_max_len(config['test']['commentsmaxlen']))
|
||||
elif v_to_clone is not None :
|
||||
infos['view'] = v_to_clone.clone(random_unique_name(infos), comments=random_str_with_max_len(config['test']['commentsmaxlen']))
|
||||
else :
|
||||
infos['view'] = View.new(infos['dms'], random_unique_name(infos), comments=random_str_with_max_len(config['test']['commentsmaxlen'])) # TODO random view class
|
||||
|
||||
print_test(config, repr(infos['view']))
|
||||
if v_to_clone is not None :
|
||||
if line_selection is None:
|
||||
assert v_to_clone.line_count == infos['view'].line_count, "New view and cloned view don't have the same line count : "+str(v_to_clone.line_count)+" (view to clone line count) != "+str(infos['view'].line_count)+" (new view line count)"
|
||||
else :
|
||||
assert len(line_selection) == infos['view'].line_count, "New view with new line selection does not have the right line count : "+str(len(line_selection))+" (line selection length) != "+str(infos['view'].line_count)+" (new view line count)"
|
||||
v_to_clone.close()
|
||||
if first :
|
||||
fill_view(config, infos)
|
||||
|
||||
|
||||
def create_test_obidms(config, infos):
|
||||
infos['dms'] = DMS.new(config['obi']['defaultdms'])
|
||||
|
||||
|
||||
def ini_dms_and_first_view(config, infos):
|
||||
create_test_obidms(config, infos)
|
||||
random_new_view(config, infos, first=True)
|
||||
infos['view_names'] = []
|
||||
|
||||
|
||||
def addOptions(parser):
|
||||
|
||||
# TODO put this common group somewhere else but I don't know where
|
||||
group=parser.add_argument_group('DMS and view options')
|
||||
|
||||
group.add_argument('--default-dms','-d',
|
||||
action="store", dest="obi:defaultdms",
|
||||
metavar='<DMS NAME>',
|
||||
default="/tmp/test_dms",
|
||||
type=str,
|
||||
help="Name of the default DMS for reading and writing data. "
|
||||
"Default: /tmp/test_dms")
|
||||
|
||||
group=parser.add_argument_group('obi test specific options')
|
||||
|
||||
group.add_argument('--nb_tests','-n',
|
||||
action="store", dest="test:nbtests",
|
||||
metavar='<NB_TESTS>',
|
||||
default=1000,
|
||||
type=int,
|
||||
help="Number of tests to carry out. "
|
||||
"Default: 1000")
|
||||
|
||||
group.add_argument('--seq_max_len','-s',
|
||||
action="store", dest="test:seqmaxlen",
|
||||
metavar='<SEQ_MAX_LEN>',
|
||||
default=200,
|
||||
type=int,
|
||||
help="Maximum length of DNA sequences. "
|
||||
"Default: 200")
|
||||
|
||||
group.add_argument('--str_max_len','-t',
|
||||
action="store", dest="test:strmaxlen",
|
||||
metavar='<STR_MAX_LEN>',
|
||||
default=200,
|
||||
type=int,
|
||||
help="Maximum length of character strings. "
|
||||
"Default: 200")
|
||||
|
||||
group.add_argument('--comments_max_len','-c',
|
||||
action="store", dest="test:commentsmaxlen",
|
||||
metavar='<COMMENTS_MAX_LEN>',
|
||||
default=10000,
|
||||
type=int,
|
||||
help="Maximum length of view comments. "
|
||||
"Default: 10000")
|
||||
|
||||
group.add_argument('--max_ini_col_count','-o',
|
||||
action="store", dest="test:maxinicolcount",
|
||||
metavar='<MAX_INI_COL_COUNT>',
|
||||
default=10,
|
||||
type=int,
|
||||
help="Maximum number of columns in the initial view. "
|
||||
"Default: 10")
|
||||
|
||||
group.add_argument('--max_line_nb','-l',
|
||||
action="store", dest="test:maxlinenb",
|
||||
metavar='<MAX_LINE_NB>',
|
||||
default=10000,
|
||||
type=int,
|
||||
help="Maximum number of lines in a column. "
|
||||
"Default: 10000")
|
||||
|
||||
group.add_argument('--max_elts_per_line','-e',
|
||||
action="store", dest="test:maxelts",
|
||||
metavar='<MAX_ELTS_PER_LINE>',
|
||||
default=20,
|
||||
type=int,
|
||||
help="Maximum number of elements per line in a column. "
|
||||
"Default: 20")
|
||||
|
||||
group.add_argument('--verbose','-v',
|
||||
action="store_true", dest="test:verbose",
|
||||
default=False,
|
||||
help="Print the tests. "
|
||||
"Default: Don't print the tests")
|
||||
|
||||
group.add_argument('--seed','-g',
|
||||
action="store", dest="test:seed",
|
||||
metavar='<SEED>',
|
||||
default=None,
|
||||
help="Seed (use for reproducible tests). "
|
||||
"Default: Seed is determined by Python")
|
||||
|
||||
def run(config):
|
||||
|
||||
if 'seed' in config['test'] :
|
||||
random.seed(config['test']['seed'])
|
||||
|
||||
infos = {'dms': None,
|
||||
'view': None,
|
||||
'view_names': None,
|
||||
'unique_names': [],
|
||||
'random_generator': {b"OBI_BOOL": random_bool, b"OBI_CHAR": random_char, b"OBI_FLOAT": random_float, b"OBI_INT": random_int, b"OBI_SEQ": random_seq, b"OBI_STR": random_bytes},
|
||||
'tests': [test_set_and_get, test_add_col, test_delete_col, test_col_alias, test_new_view]
|
||||
}
|
||||
|
||||
config['test']['elt_name_max_len'] = int((COL_COMMENTS_MAX_LEN - config['test']['maxelts']) / config['test']['maxelts'])
|
||||
|
||||
print("Initializing the DMS and the first view...")
|
||||
|
||||
shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True)
|
||||
|
||||
ini_dms_and_first_view(config, infos)
|
||||
print_test(config, repr(infos['view']))
|
||||
|
||||
i = 0
|
||||
for t in range(config['test']['nbtests']):
|
||||
random_test(config, infos)
|
||||
print_test(config, repr(infos['view']))
|
||||
i+=1
|
||||
if (i%(config['test']['nbtests']/10)) == 0 :
|
||||
print("Testing......"+str(i*100/config['test']['nbtests'])+"%")
|
||||
|
||||
#print(infos)
|
||||
|
||||
infos['view'].close()
|
||||
infos['dms'].close()
|
||||
shutil.rmtree(config['obi']['defaultdms']+'.obidms', ignore_errors=True)
|
||||
|
||||
print("Done.")
|
||||
|
||||
|
Reference in New Issue
Block a user