obi import: new option --preread to do a first readthrough of the
dataset if it contains huge dictionaries for a much faster import.
This commit is contained in:
@ -11,6 +11,7 @@ from obitools3.dms.column.column cimport Column
|
||||
from obitools3.dms.obiseq cimport Nuc_Seq
|
||||
from obitools3.dms import DMS
|
||||
from obitools3.dms.taxo.taxo cimport Taxonomy
|
||||
from obitools3.files.uncompress cimport CompressedFile
|
||||
|
||||
|
||||
from obitools3.utils cimport tobytes, \
|
||||
@ -65,6 +66,14 @@ def addOptions(parser):
|
||||
addTaxdumpInputOption(parser)
|
||||
addMinimalOutputOption(parser)
|
||||
|
||||
group = parser.add_argument_group('obi import specific options')
|
||||
|
||||
group.add_argument('--preread',
|
||||
action="store_true", dest="import:preread",
|
||||
default=False,
|
||||
help="Do a first readthrough of the dataset if it contains huge dictionaries (more than 100 keys) for "
|
||||
"a much faster import.")
|
||||
|
||||
|
||||
def run(config):
|
||||
|
||||
@ -169,8 +178,6 @@ def run(config):
|
||||
|
||||
if entry_count >= 0:
|
||||
pb = ProgressBar(entry_count, config, seconde=5)
|
||||
|
||||
entries = input[1]
|
||||
|
||||
NUC_SEQS_view = False
|
||||
if isinstance(output[1], View) :
|
||||
@ -188,6 +195,60 @@ def run(config):
|
||||
|
||||
dcols = {}
|
||||
|
||||
# First read through the entries to prepare columns with dictionaries as they are very time-expensive to rewrite
|
||||
if config['import']['preread']:
|
||||
logger("info", "First readthrough...")
|
||||
entries = input[1]
|
||||
i = 0
|
||||
dict_dict = {}
|
||||
for entry in entries:
|
||||
PyErr_CheckSignals()
|
||||
|
||||
if entry is None: # error or exception handled at lower level, not raised because Python generators can't resume after any exception is raised
|
||||
if config['obi']['skiperror']:
|
||||
i-=1
|
||||
continue
|
||||
else:
|
||||
raise Exception("obi import error in first readthrough")
|
||||
|
||||
if pb is not None:
|
||||
pb(i)
|
||||
elif not i%50000:
|
||||
logger("info", "Read %d entries", i)
|
||||
|
||||
for tag in entry :
|
||||
if type(entry[tag]) == dict :
|
||||
if tag in dict_dict:
|
||||
dict_dict[tag][0].update(entry[tag].keys())
|
||||
else:
|
||||
dict_dict[tag] = [set(entry[tag].keys()), get_obitype(entry[tag])]
|
||||
i+=1
|
||||
|
||||
if pb is not None:
|
||||
pb(i, force=True)
|
||||
print("", file=sys.stderr)
|
||||
|
||||
for tag in dict_dict:
|
||||
dcols[tag] = (Column.new_column(view, tag, dict_dict[tag][1], \
|
||||
nb_elements_per_line=len(dict_dict[tag][0]), \
|
||||
elements_names=list(dict_dict[tag][0])), \
|
||||
value_obitype)
|
||||
|
||||
|
||||
# Reinitialize the input
|
||||
if isinstance(input[0], CompressedFile):
|
||||
input_is_file = True
|
||||
if entry_count >= 0:
|
||||
pb = ProgressBar(entry_count, config, seconde=5)
|
||||
try:
|
||||
input[0].close()
|
||||
except AttributeError:
|
||||
pass
|
||||
input = open_uri(config['obi']['inputURI'], force_file=input_is_file)
|
||||
if input is None:
|
||||
raise Exception("Could not open input URI")
|
||||
|
||||
entries = input[1]
|
||||
i = 0
|
||||
for entry in entries :
|
||||
|
||||
|
Reference in New Issue
Block a user