New command: obi cat to concatenate views (not optimized yet)
This commit is contained in:
104
python/obitools3/commands/cat.pyx
Executable file
104
python/obitools3/commands/cat.pyx
Executable file
@ -0,0 +1,104 @@
|
||||
#cython: language_level=3
|
||||
|
||||
from obitools3.apps.progress cimport ProgressBar # @UnresolvedImport
|
||||
from obitools3.dms import DMS
|
||||
from obitools3.dms.view.view cimport View, Line_selection
|
||||
from obitools3.uri.decode import open_uri
|
||||
from obitools3.apps.optiongroups import addMinimalInputOption, addMinimalOutputOption
|
||||
from obitools3.dms.view import RollbackException
|
||||
from obitools3.apps.config import logger
|
||||
from obitools3.utils cimport str2bytes
|
||||
from obitools3.dms.view.typed_view.view_NUC_SEQS cimport View_NUC_SEQS
|
||||
from obitools3.dms.capi.obiview cimport QUALITY_COLUMN
|
||||
from obitools3.commands.ngsfilter import REVERSE_QUALITY_COLUMN_NAME # TODO should be stored in C
|
||||
|
||||
import time
|
||||
import sys
|
||||
|
||||
from cpython.exc cimport PyErr_CheckSignals
|
||||
|
||||
|
||||
__title__="Concatenate views."
|
||||
|
||||
|
||||
def addOptions(parser):
|
||||
|
||||
addMinimalOutputOption(parser)
|
||||
|
||||
group=parser.add_argument_group('obi cat specific options')
|
||||
|
||||
group.add_argument("-c",
|
||||
action="append", dest="cat:views_to_cat",
|
||||
metavar="<VIEW_NAME>",
|
||||
default=[],
|
||||
type=str,
|
||||
help="URI of a view to concatenate. (e.g. 'my_dms/my_view'). "
|
||||
"Several -c options can be used on the same "
|
||||
"command line.")
|
||||
|
||||
|
||||
def run(config):
|
||||
|
||||
DMS.obi_atexit()
|
||||
|
||||
logger("info", "obi cat")
|
||||
|
||||
# Open the views to concatenate
|
||||
iview_list = []
|
||||
idms_list = []
|
||||
total_len = 0
|
||||
v_type = View_NUC_SEQS
|
||||
for v_uri in config["cat"]["views_to_cat"]:
|
||||
input = open_uri(v_uri)
|
||||
if input is None:
|
||||
raise Exception("Could not read input view")
|
||||
i_dms = input[0]
|
||||
i_view = input[1]
|
||||
if input[2] != View_NUC_SEQS: # Check view type (output view is nuc_seqs view if all input view are nuc_seqs view)
|
||||
v_type = View
|
||||
total_len += len(i_view)
|
||||
iview_list.append(i_view)
|
||||
idms_list.append(i_dms)
|
||||
|
||||
# Open the output: only the DMS
|
||||
output = open_uri(config['obi']['outputURI'],
|
||||
input=False,
|
||||
newviewtype=v_type)
|
||||
if output is None:
|
||||
raise Exception("Could not create output view")
|
||||
o_dms = output[0]
|
||||
o_view = output[1]
|
||||
|
||||
# Initialize the progress bar
|
||||
pb = ProgressBar(total_len, config, seconde=5)
|
||||
|
||||
i = 0
|
||||
for v in iview_list:
|
||||
for l in v:
|
||||
PyErr_CheckSignals()
|
||||
pb(i)
|
||||
o_view[i] = l
|
||||
i+=1
|
||||
|
||||
# Deletes quality columns if there are any
|
||||
if QUALITY_COLUMN in o_view:
|
||||
o_view.delete_column(QUALITY_COLUMN)
|
||||
if REVERSE_QUALITY_COLUMN_NAME in o_view:
|
||||
o_view.delete_column(REVERSE_QUALITY_COLUMN_NAME)
|
||||
|
||||
pb(i, force=True)
|
||||
print("", file=sys.stderr)
|
||||
|
||||
# Save command config in DMS comments
|
||||
command_line = " ".join(sys.argv[1:])
|
||||
o_view.write_config(config, "cat", command_line, input_dms_name=[d.name for d in idms_list], input_view_name=[v.name for v in iview_list])
|
||||
o_dms.record_command_line(command_line)
|
||||
|
||||
#print("\n\nOutput view:\n````````````", file=sys.stderr)
|
||||
#print(repr(view), file=sys.stderr)
|
||||
|
||||
for d in idms_list:
|
||||
d.close()
|
||||
o_dms.close()
|
||||
|
||||
logger("info", "Done.")
|
Reference in New Issue
Block a user