From f9999465825e8d619e9c081569ab282863fe44a4 Mon Sep 17 00:00:00 2001 From: Celine Mercier Date: Thu, 7 May 2020 17:05:54 +0200 Subject: [PATCH] obi uniq: fixed the remerging of already merged informations, and efficiency improvements --- python/obitools3/commands/uniq.pxd | 4 +- python/obitools3/commands/uniq.pyx | 141 +++++++++++++++++++---------- 2 files changed, 93 insertions(+), 52 deletions(-) diff --git a/python/obitools3/commands/uniq.pxd b/python/obitools3/commands/uniq.pxd index 2f72d4a..40833c0 100755 --- a/python/obitools3/commands/uniq.pxd +++ b/python/obitools3/commands/uniq.pxd @@ -5,5 +5,5 @@ from obitools3.dms.taxo.taxo cimport Taxonomy from obitools3.dms.view.typed_view.view_NUC_SEQS cimport View_NUC_SEQS -cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) -cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, list mergedKeys_list=*, Taxonomy taxonomy=*, bint mergeIds=*, list categories=*, int max_elts=*) +cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy, dict config) +cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, dict config, list mergedKeys_list=*, Taxonomy taxonomy=*, bint mergeIds=*, list categories=*, int max_elts=*) diff --git a/python/obitools3/commands/uniq.pyx b/python/obitools3/commands/uniq.pyx index 668b6fa..5b9b532 100644 --- a/python/obitools3/commands/uniq.pyx +++ b/python/obitools3/commands/uniq.pyx @@ -56,7 +56,7 @@ def addOptions(parser): "(option can be used several times).") -cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : +cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy, dict config) : cdef int taxid cdef Nuc_Seq_Stored seq @@ -69,7 +69,7 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : cdef object gn_sn cdef object fa_sn - # Create columns + # Create columns and save them for efficiency if b"species" in o_view and o_view[b"species"].data_type_int != OBI_INT : o_view.delete_column(b"species") if b"species" not in o_view: @@ -77,6 +77,7 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : b"species", OBI_INT ) + species_column = o_view[b"species"] if b"genus" in o_view and o_view[b"genus"].data_type_int != OBI_INT : o_view.delete_column(b"genus") @@ -85,6 +86,7 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : b"genus", OBI_INT ) + genus_column = o_view[b"genus"] if b"family" in o_view and o_view[b"family"].data_type_int != OBI_INT : o_view.delete_column(b"family") @@ -93,6 +95,7 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : b"family", OBI_INT ) + family_column = o_view[b"family"] if b"species_name" in o_view and o_view[b"species_name"].data_type_int != OBI_STR : o_view.delete_column(b"species_name") @@ -101,6 +104,7 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : b"species_name", OBI_STR ) + species_name_column = o_view[b"species_name"] if b"genus_name" in o_view and o_view[b"genus_name"].data_type_int != OBI_STR : o_view.delete_column(b"genus_name") @@ -109,6 +113,7 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : b"genus_name", OBI_STR ) + genus_name_column = o_view[b"genus_name"] if b"family_name" in o_view and o_view[b"family_name"].data_type_int != OBI_STR : o_view.delete_column(b"family_name") @@ -117,6 +122,7 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : b"family_name", OBI_STR ) + family_name_column = o_view[b"family_name"] if b"rank" in o_view and o_view[b"rank"].data_type_int != OBI_STR : o_view.delete_column(b"rank") @@ -125,6 +131,7 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : b"rank", OBI_STR ) + rank_column = o_view[b"rank"] if b"scientific_name" in o_view and o_view[b"scientific_name"].data_type_int != OBI_STR : o_view.delete_column(b"scientific_name") @@ -133,9 +140,15 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : b"scientific_name", OBI_STR ) - - for seq in o_view: - PyErr_CheckSignals() + scientific_name_column = o_view[b"scientific_name"] + + # Initialize the progress bar + pb = ProgressBar(len(o_view), config, seconde=5) + + i=0 + for seq in o_view: + PyErr_CheckSignals() + pb(i) if MERGED_TAXID_COLUMN in seq : m_taxids = [] m_taxids_dict = seq[MERGED_TAXID_COLUMN] @@ -165,20 +178,23 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : else: fa_sn = None tfa = None - - seq[b"species"] = tsp - seq[b"genus"] = tgn - seq[b"family"] = tfa - - seq[b"species_name"] = sp_sn - seq[b"genus_name"] = gn_sn - seq[b"family_name"] = fa_sn - seq[b"rank"] = taxonomy.get_rank(taxid) - seq[b"scientific_name"] = taxonomy.get_scientific_name(taxid) + species_column[i] = tsp + genus_column[i] = tgn + family_column[i] = tfa + + species_name_column[i] = sp_sn + genus_name_column[i] = gn_sn + family_name_column[i] = fa_sn + + rank_column[i] = taxonomy.get_rank(taxid) + scientific_name_column[i] = taxonomy.get_scientific_name(taxid) + i+=1 + + pb(len(o_view), force=True) + - -cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, list mergedKeys_list=None, Taxonomy taxonomy=None, bint mergeIds=False, list categories=None, int max_elts=1000000) : +cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, dict config, list mergedKeys_list=None, Taxonomy taxonomy=None, bint mergeIds=False, list categories=None, int max_elts=1000000) : cdef int i cdef int k @@ -187,6 +203,7 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li cdef int u_idx cdef int i_idx cdef int i_count + cdef int o_count cdef str key_str cdef bytes key cdef bytes mkey @@ -209,7 +226,6 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li cdef Nuc_Seq_Stored i_seq cdef Nuc_Seq_Stored o_seq cdef Nuc_Seq_Stored u_seq - cdef Column i_col cdef Column i_seq_col cdef Column i_id_col cdef Column i_taxid_col @@ -217,6 +233,8 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li cdef Column o_id_col cdef Column o_taxid_dist_col cdef Column o_merged_col + cdef Column o_count_col + cdef Column i_count_col cdef Column_line i_mcol cdef object taxid_dist_dict cdef object iter_view @@ -252,7 +270,12 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li mergedKeys_m = [] for k in range(k_count): mergedKeys_m.append(MERGED_PREFIX + mergedKeys[k]) - + + # Check that not trying to remerge without total count information + for key in mergedKeys_m: + if key in view and COUNT_COLUMN not in view: + raise Exception("\n>>>>\nError: trying to re-merge tags without total count tag. Run obi annotate to add the count tag from the relevant merged tag, i.e.: \nobi annotate --set-tag COUNT:'sum([value for key,value in sequence['MERGED_sample'].items()])' dms/input dms/output\n") + if categories is None: categories = [] @@ -320,7 +343,11 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li for k in range(k_count): key = mergedKeys[k] merged_col_name = mergedKeys_m[k] - i_col = view[key] + + if merged_col_name in view: + i_col = view[merged_col_name] + else: + i_col = view[key] if merged_infos[merged_col_name]['nb_elts'] > max_elts: str_merged_cols.append(merged_col_name) @@ -374,12 +401,19 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li alias=MERGED_COLUMN ) - # Keep columns that are going to be used a lot in variables + # Keep columns in variables for efficiency o_id_col = o_view[ID_COLUMN] if TAXID_DIST_COLUMN in o_view: o_taxid_dist_col = o_view[TAXID_DIST_COLUMN] if MERGED_COLUMN in o_view: o_merged_col = o_view[MERGED_COLUMN] + if COUNT_COLUMN not in o_view: + Column.new_column(o_view, + COUNT_COLUMN, + OBI_INT) + o_count_col = o_view[COUNT_COLUMN] + if COUNT_COLUMN in view: + i_count_col = view[COUNT_COLUMN] pb(len(view), force=True) print("") @@ -407,7 +441,7 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li merged_list = list(set(merged_list)) # deduplicate the list o_merged_col[o_idx] = merged_list - o_seq[COUNT_COLUMN] = 0 + o_count = 0 if TAXID_DIST_COLUMN in u_seq and i_taxid_dist_col[u_idx] is not None: taxid_dist_dict = i_taxid_dist_col[u_idx] @@ -423,12 +457,12 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li i_id = i_id_col[i_idx] i_seq = view[i_idx] - if COUNT_COLUMN not in i_seq or i_seq[COUNT_COLUMN] is None: + if COUNT_COLUMN not in i_seq or i_count_col[i_idx] is None: i_count = 1 else: - i_count = i_seq[COUNT_COLUMN] + i_count = i_count_col[i_idx] - o_seq[COUNT_COLUMN] += i_count + o_count += i_count for k in range(k_count): @@ -463,44 +497,52 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li mcol[key2] = i_mcol[key2] else: mcol[key2] = mcol[key2] + i_mcol[key2] - - # Write taxid_dist - if mergeIds and TAXID_COLUMN in mergedKeys: - if TAXID_DIST_COLUMN in str_merged_cols: - o_taxid_dist_col[o_idx] = str(taxid_dist_dict) - else: - o_taxid_dist_col[o_idx] = taxid_dist_dict - - # Write merged dicts - for mkey in merged_dict: - if mkey in str_merged_cols: - mkey_cols[mkey][o_idx] = str(merged_dict[mkey]) - else: - mkey_cols[mkey][o_idx] = merged_dict[mkey] - # Sets NA values to 0 # TODO discuss, for now keep as None and test for None instead of testing for 0 in tools - #for key in mkey_cols[mkey][o_idx]: - # if mkey_cols[mkey][o_idx][key] is None: - # mkey_cols[mkey][o_idx][key] = 0 - + for key in i_seq.keys(): # Delete informations that differ between the merged sequences - # TODO make special columns list? + # TODO make special columns list? // could be more efficient if key != COUNT_COLUMN and key != ID_COLUMN and key != NUC_SEQUENCE_COLUMN and key in o_seq and o_seq[key] != i_seq[key] \ and key not in merged_dict : o_seq[key] = None + # Write merged dicts + for mkey in merged_dict: + if mkey in str_merged_cols: + mkey_cols[mkey][o_idx] = str(merged_dict[mkey]) + else: + mkey_cols[mkey][o_idx] = merged_dict[mkey] + # Sets NA values to 0 # TODO discuss, for now keep as None and test for None instead of testing for 0 in tools + #for key in mkey_cols[mkey][o_idx]: + # if mkey_cols[mkey][o_idx][key] is None: + # mkey_cols[mkey][o_idx][key] = 0 + + # Write taxid_dist + if mergeIds and TAXID_COLUMN in mergedKeys: + if TAXID_DIST_COLUMN in str_merged_cols: + o_taxid_dist_col[o_idx] = str(taxid_dist_dict) + else: + o_taxid_dist_col[o_idx] = taxid_dist_dict + + o_count_col[o_idx] = o_count o_idx += 1 + pb(len(uniques), force=True) + # Deletes quality columns if there is one because the matching between sequence and quality will be broken (quality set to NA when sequence not) if QUALITY_COLUMN in view: o_view.delete_column(QUALITY_COLUMN) if REVERSE_QUALITY_COLUMN in view: o_view.delete_column(REVERSE_QUALITY_COLUMN) + # Delete old columns that are now merged + for k in range(k_count): + if mergedKeys[k] in o_view: + o_view.delete_column(mergedKeys[k]) + if taxonomy is not None: print("") # TODO because in the middle of progress bar. Better solution? logger("info", "Merging taxonomy classification") - merge_taxonomy_classification(o_view, taxonomy) + merge_taxonomy_classification(o_view, taxonomy, config) @@ -547,11 +589,10 @@ def run(config): pb = ProgressBar(len(entries), config, seconde=5) try: - uniq_sequences(entries, o_view, pb, mergedKeys_list=config['uniq']['merge'], taxonomy=taxo, mergeIds=config['uniq']['mergeids'], categories=config['uniq']['categories'], max_elts=config['obi']['maxelts']) + uniq_sequences(entries, o_view, pb, config, mergedKeys_list=config['uniq']['merge'], taxonomy=taxo, mergeIds=config['uniq']['mergeids'], categories=config['uniq']['categories'], max_elts=config['obi']['maxelts']) except Exception, e: raise RollbackException("obi uniq error, rollbacking view: "+str(e), o_view) - pb(len(entries), force=True) print("", file=sys.stderr) # Save command config in View and DMS comments @@ -567,8 +608,8 @@ def run(config): #print("\n\nOutput view:\n````````````", file=sys.stderr) #print(repr(o_view), file=sys.stderr) - input[0].close() - output[0].close() + input[0].close(force=True) + output[0].close(force=True) logger("info", "Done.")