diff --git a/python/obitools3/commands/uniq.pxd b/python/obitools3/commands/uniq.pxd index db21652..2f72d4a 100644 --- a/python/obitools3/commands/uniq.pxd +++ b/python/obitools3/commands/uniq.pxd @@ -6,4 +6,4 @@ from obitools3.dms.view.typed_view.view_NUC_SEQS cimport View_NUC_SEQS cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) -cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, list mergedKeys_list=*, Taxonomy taxonomy=*, bint mergeIds=*, list categories=*) +cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, list mergedKeys_list=*, Taxonomy taxonomy=*, bint mergeIds=*, list categories=*, int max_elts=*) diff --git a/python/obitools3/commands/uniq.pyx b/python/obitools3/commands/uniq.pyx index db76ac7..24cfc0a 100644 --- a/python/obitools3/commands/uniq.pyx +++ b/python/obitools3/commands/uniq.pyx @@ -11,9 +11,10 @@ from obitools3.dms.capi.obitypes cimport OBI_INT, OBI_STR, index_t from obitools3.apps.optiongroups import addSequenceInputOption, addMinimalOutputOption, addTaxonomyInputOption from obitools3.uri.decode import open_uri from obitools3.apps.config import logger +from obitools3.utils cimport tobytes -__title__="Groups records together" +__title__="Groups sequence records together" @@ -60,79 +61,79 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : cdef object fa_sn # Create columns - if 'species' in o_view and o_view['species'].data_type_int != OBI_INT : - o_view.delete_column('species') - if 'species' not in o_view: + if b"species" in o_view and o_view[b"species"].data_type_int != OBI_INT : + o_view.delete_column(b"species") + if b"species" not in o_view: Column.new_column(o_view, - 'species', + b"species", OBI_INT ) - if 'genus' in o_view and o_view['genus'].data_type_int != OBI_INT : - o_view.delete_column('genus') - if 'genus' not in o_view: + if b"genus" in o_view and o_view[b"genus"].data_type_int != OBI_INT : + o_view.delete_column(b"genus") + if b"genus" not in o_view: Column.new_column(o_view, - 'genus', + b"genus", OBI_INT ) - if 'family' in o_view and o_view['family'].data_type_int != OBI_INT : - o_view.delete_column('family') - if 'family' not in o_view: + if b"family" in o_view and o_view[b"family"].data_type_int != OBI_INT : + o_view.delete_column(b"family") + if b"family" not in o_view: Column.new_column(o_view, - 'family', + b"family", OBI_INT ) - if 'species_name' in o_view and o_view['species_name'].data_type_int != OBI_STR : - o_view.delete_column('species_name') - if 'species_name' not in o_view: + if b"species_name" in o_view and o_view[b"species_name"].data_type_int != OBI_STR : + o_view.delete_column(b"species_name") + if b"species_name" not in o_view: Column.new_column(o_view, - 'species_name', + b"species_name", OBI_STR ) - if 'genus_name' in o_view and o_view['genus_name'].data_type_int != OBI_STR : - o_view.delete_column('genus_name') - if 'genus_name' not in o_view: + if b"genus_name" in o_view and o_view[b"genus_name"].data_type_int != OBI_STR : + o_view.delete_column(b"genus_name") + if b"genus_name" not in o_view: Column.new_column(o_view, - 'genus_name', + b"genus_name", OBI_STR ) - if 'family_name' in o_view and o_view['family_name'].data_type_int != OBI_STR : - o_view.delete_column('family_name') - if 'family_name' not in o_view: + if b"family_name" in o_view and o_view[b"family_name"].data_type_int != OBI_STR : + o_view.delete_column(b"family_name") + if b"family_name" not in o_view: Column.new_column(o_view, - 'family_name', + b"family_name", OBI_STR ) - if 'rank' in o_view and o_view['rank'].data_type_int != OBI_STR : - o_view.delete_column('rank') - if 'rank' not in o_view: + if b"rank" in o_view and o_view[b"rank"].data_type_int != OBI_STR : + o_view.delete_column(b"rank") + if b"rank" not in o_view: Column.new_column(o_view, - 'rank', + b"rank", OBI_STR ) - if 'scientific_name' in o_view and o_view['scientific_name'].data_type_int != OBI_STR : - o_view.delete_column('scientific_name') - if 'scientific_name' not in o_view: + if b"scientific_name" in o_view and o_view[b"scientific_name"].data_type_int != OBI_STR : + o_view.delete_column(b"scientific_name") + if b"scientific_name" not in o_view: Column.new_column(o_view, - 'scientific_name', + b"scientific_name", OBI_STR ) - - for seq in o_view: - if 'merged_taxid' in seq : - m_taxids = [] - for k in seq['merged_taxid'].keys() : - if seq['merged_taxid'][k] is not None: + for seq in o_view: + if b"merged_taxid" in seq : + m_taxids = [] + m_taxids_dict = seq[b"merged_taxid"] + for k in m_taxids_dict.keys() : + if m_taxids_dict[k] is not None: m_taxids.append(int(k)) taxid = taxonomy.last_common_taxon(*m_taxids) - seq['taxid'] = taxid + seq[b"taxid"] = taxid tsp = taxonomy.get_species(taxid) tgn = taxonomy.get_genus(taxid) tfa = taxonomy.get_family(taxid) @@ -155,72 +156,126 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) : fa_sn = None tfa = None - seq['species'] = tsp - seq['genus'] = tgn - seq['family'] = tfa + seq[b"species"] = tsp + seq[b"genus"] = tgn + seq[b"family"] = tfa - seq['species_name'] = sp_sn - seq['genus_name'] = gn_sn - seq['family_name'] = fa_sn + seq[b"species_name"] = sp_sn + seq[b"genus_name"] = gn_sn + seq[b"family_name"] = fa_sn - seq['rank'] = taxonomy.get_rank(taxid) - seq['scientific_name'] = taxonomy.get_scientific_name(taxid) + seq[b"rank"] = taxonomy.get_rank(taxid) + seq[b"scientific_name"] = taxonomy.get_scientific_name(taxid) -cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, list mergedKeys_list=None, Taxonomy taxonomy=None, bint mergeIds=False, list categories=None) : +cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, list mergedKeys_list=None, Taxonomy taxonomy=None, bint mergeIds=False, list categories=None, int max_elts=10000) : cdef int i + cdef int k + cdef int k_count cdef int o_idx cdef int u_idx - cdef tuple u_id + cdef int i_idx cdef int i_count - cdef set mergedKeys + cdef str key_str + cdef bytes key + cdef bytes mkey + cdef bytes merged_col_name + cdef bytes o_id + cdef bytes i_id + cdef set mergedKeys_set + cdef tuple unique_id + cdef list catl + cdef list mergedKeys + cdef list mergedKeys_list_b + cdef list mergedKeys_m + cdef list str_merged_cols + cdef list merged_sequences cdef dict uniques cdef dict merged_infos - cdef object iter_view + cdef dict mkey_infos + cdef dict merged_dict + cdef dict mkey_cols cdef Line i_seq cdef Line o_seq - cdef str key - cdef bytes key_b - cdef str mkey - cdef str merged_col_name + cdef Line u_seq cdef Column i_col - cdef Column seq_col - cdef object to_merge - cdef Column_line mcol + cdef Column i_seq_col + cdef Column i_id_col + cdef Column i_taxid_col + cdef Column i_taxid_dist_col + cdef Column o_id_col + cdef Column o_taxid_dist_col + cdef Column o_merged_col cdef Column_line i_mcol - cdef list catl - cdef dict merged_ids_dict - cdef int nb_ids_max - cdef set ids_list - cdef bytes k - cdef int n - cdef dict taxid_dist_dict + cdef object taxid_dist_dict + cdef object iter_view + cdef object mcol + cdef object to_merge #print(categories) - + uniques = {} - - if mergedKeys_list is not None: - mergedKeys=set(mergedKeys_list) - else: - mergedKeys=set() + mergedKeys_list_b = [] + if mergedKeys_list is not None: + for key_str in mergedKeys_list: + mergedKeys_list_b.append(tobytes(key_str)) + mergedKeys_set=set(mergedKeys_list_b) + else: + mergedKeys_set=set() + if taxonomy is not None: - mergedKeys.add('taxid') - + mergedKeys_set.add(b"taxid") + + mergedKeys = list(mergedKeys_set) + + k_count = len(mergedKeys) + + mergedKeys_m = [] + for k in range(k_count): + mergedKeys_m.append(b"merged_" + mergedKeys[k]) + if categories is None: categories = [] - - # Going through columns to merge a first time to create merged columns with the good number of elements per line and elemnts names + + # Keep columns that are going to be used a lot in variables + i_seq_col = view[NUC_SEQUENCE_COLUMN] + i_id_col = view[ID_COLUMN] + i_taxid_col = view[b"taxid"] + if b"taxid_dist" in view: + i_taxid_dist_col = view[b"taxid_dist"] + + + # First browsing + i = 0 + o_idx = 0 + logger("info", "First browsing through the input") merged_infos = {} - i = 0 iter_view = iter(view) - for i_seq in iter_view: + for i_seq in iter_view : pb(i) - for key in mergedKeys: - mkey = "merged_%s" % key + + # This can't be done in the same line as the unique_id tuple creation because it generates a bug + # where Cython (version 0.25.2) does not detect the reference to the categs_list variable and deallocates + # it at the beginning of the function. + # (Only happens if categs_list is an optional parameter, which it is). + catl = [] + for x in categories : + catl.append(i_seq[x]) + + unique_id = tuple(catl) + (i_seq_col.get_line_idx(i),) + #unique_id = tuple(i_seq[x] for x in categories) + (seq_col.get_line_idx(i),) # The line that cython can't read properly + + if unique_id in uniques: + uniques[unique_id].append(i) + else: + uniques[unique_id] = [i] + + for k in range(k_count): + key = mergedKeys[k] + mkey = mergedKeys_m[k] if key in i_seq: # TODO what if mkey already in i_seq? if mkey not in merged_infos: merged_infos[mkey] = {} @@ -234,166 +289,183 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li mkey_infos['nb_elts'] += 1 i+=1 - for key in mergedKeys: - merged_col_name = "merged_%s" % key + # Create merged columns + str_merged_cols = [] + mkey_cols = {} + for k in range(k_count): + key = mergedKeys[k] + merged_col_name = mergedKeys_m[k] i_col = view[key] - Column.new_column(o_view, - merged_col_name, - OBI_INT, - nb_elements_per_line=merged_infos[merged_col_name]['nb_elts'], - elements_names=merged_infos[merged_col_name]['elt_names'], - comments=i_col.comments, - alias=merged_col_name # TODO what if it already exists - ) - + if merged_infos[merged_col_name]['nb_elts'] > max_elts: + str_merged_cols.append(merged_col_name) + Column.new_column(o_view, + merged_col_name, + OBI_STR, + to_eval=True, + comments=i_col.comments, + alias=merged_col_name # TODO what if it already exists + ) + + else: + Column.new_column(o_view, + merged_col_name, + OBI_INT, + nb_elements_per_line=merged_infos[merged_col_name]['nb_elts'], + elements_names=list(merged_infos[merged_col_name]['elt_names']), + comments=i_col.comments, + alias=merged_col_name # TODO what if it already exists + ) + + mkey_cols[merged_col_name] = o_view[merged_col_name] + + # taxid_dist column + if mergeIds and b"taxid" in mergedKeys: + if len(view) > max_elts: #The number of different IDs corresponds to the number of sequences in the view + str_merged_cols.append(b"taxid_dist") + Column.new_column(o_view, + b"taxid_dist", + OBI_STR, + to_eval=True, + comments=b"obi uniq taxid dist, stored as character strings to be read as dict", + alias=b"taxid_dist" # TODO what if it already exists + ) + else: + Column.new_column(o_view, + b"taxid_dist", + OBI_INT, + nb_elements_per_line=len(view), + elements_names=[id for id in i_id_col], + comments=b"obi uniq taxid dist", + alias=b"taxid_dist" # TODO what if it already exists + ) + + del(merged_infos) + # Merged ids column + if mergeIds : + Column.new_column(o_view, + b"merged", + OBI_STR, + tuples=True, + comments=b"obi uniq merged ids", + alias=b"merged" # TODO what if it already exists + ) + + + # Keep columns that are going to be used a lot in variables + o_id_col = o_view[ID_COLUMN] + if b"taxid_dist" in o_view: + o_taxid_dist_col = o_view[b"taxid_dist"] + if b"merged" in o_view: + o_merged_col = o_view[b"merged"] + logger("info", "Second browsing through the input") - merged_ids_dict = {} - taxid_dist_dict = {} - i = 0 + # Initialize the progress bar + pb = ProgressBar(len(uniques), seconde=5) o_idx = 0 - seq_col = view[NUC_SEQUENCE_COLUMN] - iter_view = iter(view) - for i_seq in iter_view : - pb(i) + for unique_id in uniques : + pb(o_idx) - # This can't be done in the same line as the u_id tuple creation because it generates a bug - # where Cython (version 0.25.2) does not detect the reference to the categs_list variable and deallocates - # it at the beginning of the function. - # (Only happens if categs_list is an optional parameter, which it is). - catl = [] - for x in categories : - catl.append(i_seq[x]) + merged_sequences = uniques[unique_id] - u_id = tuple(catl) + (seq_col.get_line_idx(i),) - #u_id = tuple(i_seq[x] for x in categories) + (seq_col.get_line_idx(i),) # The line that cython can't read properly + u_idx = uniques[unique_id][0] + u_seq = view[u_idx] + o_view[o_idx] = u_seq + o_seq = o_view[o_idx] + o_id = o_seq.id - if u_id in uniques: + if mergeIds: + o_merged_col[o_idx] = [view[idx].id for idx in merged_sequences] + + if COUNT_COLUMN not in o_seq or o_seq[COUNT_COLUMN] is None: + o_seq[COUNT_COLUMN] = 1 + + if b"taxid_dist" in u_seq and i_taxid_dist_col[u_idx] is not None: + taxid_dist_dict = i_taxid_dist_col[u_idx] + else: + taxid_dist_dict = {} + + for i_idx in merged_sequences: + i_id = i_id_col[i_idx] + i_seq = view[i_idx] + if COUNT_COLUMN not in i_seq or i_seq[COUNT_COLUMN] is None: i_count = 1 else: i_count = i_seq[COUNT_COLUMN] - - u_idx = uniques[u_id] - o_seq = o_view[u_idx] + o_seq[COUNT_COLUMN] += i_count - - for key in mergedKeys: - if key=='taxid' and mergeIds: - if 'taxid_dist' in i_seq: - taxid_dist_dict[o_seq.id].update(i_seq["taxid_dist"]) - if 'taxid' in i_seq: - taxid_dist_dict[o_seq.id][i_seq.id] = i_seq['taxid'] + + merged_dict = {} + for mkey in mergedKeys_m: + merged_dict[mkey] = {} + + for k in range(k_count): - mkey = "merged_%s" % key + key = mergedKeys[k] + mkey = mergedKeys_m[k] + + if key==b"taxid" and mergeIds: + if b"taxid_dist" in i_seq: + taxid_dist_dict.update(i_taxid_dist_col[i_idx]) + if b"taxid" in i_seq: + taxid_dist_dict[i_id] = i_taxid_col[i_idx] + #cas ou on met a jour les merged_keys mais il n'y a pas de merged_keys dans la sequence qui arrive if key in i_seq: to_merge = i_seq[key] - if type(to_merge) != bytes : - to_merge = str(to_merge) - mcol = o_seq[mkey] - if mcol[to_merge] is None: - mcol[to_merge] = i_count - else: - mcol[to_merge] = mcol[to_merge] + i_count + if to_merge is not None: + if type(to_merge) != bytes : + to_merge = tobytes(str(to_merge)) + mcol = merged_dict[mkey] + if to_merge not in mcol or mcol[to_merge] is None: + mcol[to_merge] = i_count + else: + mcol[to_merge] = mcol[to_merge] + i_count + o_seq[key] = None #cas ou merged_keys existe deja else: # TODO is this a good else if mkey in i_seq: - mcol = o_seq[mkey] + mcol = merged_dict[mkey] i_mcol = i_seq[mkey] - for key_b in i_mcol: - if mcol[key_b] is None: - mcol[key_b] = i_mcol[key_b] - else: - mcol[key_b] = mcol[key_b] + i_mcol[key_b] - - for key_b in i_seq.keys(): - # Merger proprement l'attribut merged s'il existe + if i_mcol is not None: + for key2 in i_mcol: + if mcol[key2] is None: + mcol[key2] = i_mcol[key2] + else: + mcol[key2] = mcol[key2] + i_mcol[key2] + + # Write taxid_dist + if mergeIds and b"taxid" in mergedKeys: + if b"taxid_dist" in str_merged_cols: + o_taxid_dist_col[o_idx] = str(taxid_dist_dict) + else: + o_taxid_dist_col[o_idx] = taxid_dist_dict + + # Write merged dicts + for mkey in merged_dict: + if mkey in str_merged_cols: + mkey_cols[mkey][o_idx] = str(merged_dict[mkey]) + else: + mkey_cols[mkey][o_idx] = merged_dict[mkey] + + for key in i_seq.keys(): + # Delete informations that differ between the merged sequences # TODO make special columns list? - if key_b != COUNT_COLUMN and key_b != ID_COLUMN and key_b != NUC_SEQUENCE_COLUMN and key_b in o_seq and o_seq[key_b] != i_seq[key_b] : - o_seq[key_b] = None - - if mergeIds : - merged_ids_dict[o_seq.id].append(i_seq.id) - - else: - o_view[o_idx] = i_seq - o_seq = o_view[o_idx] - uniques[u_id] = o_idx - o_idx += 1 - - if COUNT_COLUMN not in o_seq or o_seq[COUNT_COLUMN] is None: - o_seq[COUNT_COLUMN] = 1 - - for key in mergedKeys: - if key=='taxid' and mergeIds: - if 'taxid' in o_seq and 'taxid_dist' not in o_seq: - taxid_dist_dict[i_seq.id] = {} - if 'taxid' in o_seq: - taxid_dist_dict[i_seq.id][o_seq.id] = o_seq['taxid'] - mkey = "merged_%s" % key - if key in o_seq: - to_merge = o_seq[key] - if type(to_merge) != bytes : - to_merge = str(to_merge) - mcol = o_seq[mkey] - if to_merge in mcol and mcol[to_merge] is not None: - mcol[to_merge] = mcol[to_merge] + o_seq[COUNT_COLUMN] - else: - mcol[to_merge] = o_seq[COUNT_COLUMN] - o_seq[key] = None # TODO delete column eventually -> make C function that deletes 'empty' columns - - if mergeIds: - merged_ids_dict[o_seq.id] = [o_seq.id] - - i+=1 - - # Merged ids column - if mergeIds : - logger("info", "Building merged ids column") - nb_ids_max = 0 - ids_list = set() - for k in merged_ids_dict : - ids_list = set(list(ids_list) + merged_ids_dict[k]) - n = len(merged_ids_dict[k]) - if n > nb_ids_max : - nb_ids_max = n - - Column.new_column(o_view, - "merged", - OBI_STR, - nb_elements_per_line=nb_ids_max, - elements_names=None, - comments="obi uniq merged ids", - alias="merged" # TODO what if it already exists - ) - - for o_seq in o_view: - o_seq['merged'] = merged_ids_dict[o_seq.id] - - if mergeIds and 'taxid' in mergedKeys: - logger("info", "Building taxid dist column") - Column.new_column(o_view, - "taxid_dist", - OBI_INT, - nb_elements_per_line=len(ids_list), - elements_names=list(ids_list), - comments="obi uniq taxid dist", - alias="taxid_dist" # TODO what if it already exists - ) - - for o_seq in o_view: - if o_seq.id in taxid_dist_dict : - o_seq['taxid_dist'] = taxid_dist_dict[o_seq.id] + if key != COUNT_COLUMN and key != ID_COLUMN and key != NUC_SEQUENCE_COLUMN and key in o_seq and o_seq[key] != i_seq[key] : + o_seq[key] = None + o_idx += 1 + if taxonomy is not None: logger("info", "Merging taxonomy classification") merge_taxonomy_classification(o_view, taxonomy) + def run(config): cdef tuple input @@ -417,12 +489,16 @@ def run(config): input=False, newviewtype=View_NUC_SEQS) - # TODO exceptions not handled like they should be + if output is None: + raise Exception("Could not create output view") entries = input[1] o_view = output[1] + if 'taxoURI' in config['obi'] : # TODO default None problem taxo_uri = open_uri(config['obi']['taxoURI']) + if taxo_uri is None: + raise RollbackException("Couldn't open taxonomy, rollbacking view", o_view) taxo = taxo_uri[1] else : taxo = None @@ -431,13 +507,13 @@ def run(config): pb = ProgressBar(len(entries), config, seconde=5) try: - uniq_sequences(entries, o_view, pb, mergedKeys_list=config['uniq']['merge'], taxonomy=taxo, mergeIds=config['uniq']['mergeids'], categories=config['uniq']['categories']) - except: - raise RollbackException("obi uniq error, rollbacking view", o_view) + uniq_sequences(entries, o_view, pb, mergedKeys_list=config['uniq']['merge'], taxonomy=taxo, mergeIds=config['uniq']['mergeids'], categories=config['uniq']['categories'], max_elts=config['obi']['maxelts']) + except Exception, e: + raise RollbackException("obi uniq error, rollbacking view: "+str(e), o_view) print("\n") print(repr(o_view)) - + input[0].close() output[0].close()