New obi uniq: stores columns with too many elements per line as

character strings, and keeps a minimum of things in the memory
This commit is contained in:
Celine Mercier
2017-12-13 22:49:08 +01:00
parent 1fd3323372
commit e9e7fac999
2 changed files with 292 additions and 216 deletions

View File

@ -6,4 +6,4 @@ from obitools3.dms.view.typed_view.view_NUC_SEQS cimport View_NUC_SEQS
cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy)
cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, list mergedKeys_list=*, Taxonomy taxonomy=*, bint mergeIds=*, list categories=*)
cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, list mergedKeys_list=*, Taxonomy taxonomy=*, bint mergeIds=*, list categories=*, int max_elts=*)

View File

@ -11,9 +11,10 @@ from obitools3.dms.capi.obitypes cimport OBI_INT, OBI_STR, index_t
from obitools3.apps.optiongroups import addSequenceInputOption, addMinimalOutputOption, addTaxonomyInputOption
from obitools3.uri.decode import open_uri
from obitools3.apps.config import logger
from obitools3.utils cimport tobytes
__title__="Groups records together"
__title__="Groups sequence records together"
@ -60,79 +61,79 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) :
cdef object fa_sn
# Create columns
if 'species' in o_view and o_view['species'].data_type_int != OBI_INT :
o_view.delete_column('species')
if 'species' not in o_view:
if b"species" in o_view and o_view[b"species"].data_type_int != OBI_INT :
o_view.delete_column(b"species")
if b"species" not in o_view:
Column.new_column(o_view,
'species',
b"species",
OBI_INT
)
if 'genus' in o_view and o_view['genus'].data_type_int != OBI_INT :
o_view.delete_column('genus')
if 'genus' not in o_view:
if b"genus" in o_view and o_view[b"genus"].data_type_int != OBI_INT :
o_view.delete_column(b"genus")
if b"genus" not in o_view:
Column.new_column(o_view,
'genus',
b"genus",
OBI_INT
)
if 'family' in o_view and o_view['family'].data_type_int != OBI_INT :
o_view.delete_column('family')
if 'family' not in o_view:
if b"family" in o_view and o_view[b"family"].data_type_int != OBI_INT :
o_view.delete_column(b"family")
if b"family" not in o_view:
Column.new_column(o_view,
'family',
b"family",
OBI_INT
)
if 'species_name' in o_view and o_view['species_name'].data_type_int != OBI_STR :
o_view.delete_column('species_name')
if 'species_name' not in o_view:
if b"species_name" in o_view and o_view[b"species_name"].data_type_int != OBI_STR :
o_view.delete_column(b"species_name")
if b"species_name" not in o_view:
Column.new_column(o_view,
'species_name',
b"species_name",
OBI_STR
)
if 'genus_name' in o_view and o_view['genus_name'].data_type_int != OBI_STR :
o_view.delete_column('genus_name')
if 'genus_name' not in o_view:
if b"genus_name" in o_view and o_view[b"genus_name"].data_type_int != OBI_STR :
o_view.delete_column(b"genus_name")
if b"genus_name" not in o_view:
Column.new_column(o_view,
'genus_name',
b"genus_name",
OBI_STR
)
if 'family_name' in o_view and o_view['family_name'].data_type_int != OBI_STR :
o_view.delete_column('family_name')
if 'family_name' not in o_view:
if b"family_name" in o_view and o_view[b"family_name"].data_type_int != OBI_STR :
o_view.delete_column(b"family_name")
if b"family_name" not in o_view:
Column.new_column(o_view,
'family_name',
b"family_name",
OBI_STR
)
if 'rank' in o_view and o_view['rank'].data_type_int != OBI_STR :
o_view.delete_column('rank')
if 'rank' not in o_view:
if b"rank" in o_view and o_view[b"rank"].data_type_int != OBI_STR :
o_view.delete_column(b"rank")
if b"rank" not in o_view:
Column.new_column(o_view,
'rank',
b"rank",
OBI_STR
)
if 'scientific_name' in o_view and o_view['scientific_name'].data_type_int != OBI_STR :
o_view.delete_column('scientific_name')
if 'scientific_name' not in o_view:
if b"scientific_name" in o_view and o_view[b"scientific_name"].data_type_int != OBI_STR :
o_view.delete_column(b"scientific_name")
if b"scientific_name" not in o_view:
Column.new_column(o_view,
'scientific_name',
b"scientific_name",
OBI_STR
)
for seq in o_view:
if 'merged_taxid' in seq :
if b"merged_taxid" in seq :
m_taxids = []
for k in seq['merged_taxid'].keys() :
if seq['merged_taxid'][k] is not None:
m_taxids_dict = seq[b"merged_taxid"]
for k in m_taxids_dict.keys() :
if m_taxids_dict[k] is not None:
m_taxids.append(int(k))
taxid = taxonomy.last_common_taxon(*m_taxids)
seq['taxid'] = taxid
seq[b"taxid"] = taxid
tsp = taxonomy.get_species(taxid)
tgn = taxonomy.get_genus(taxid)
tfa = taxonomy.get_family(taxid)
@ -155,72 +156,126 @@ cdef merge_taxonomy_classification(View_NUC_SEQS o_view, Taxonomy taxonomy) :
fa_sn = None
tfa = None
seq['species'] = tsp
seq['genus'] = tgn
seq['family'] = tfa
seq[b"species"] = tsp
seq[b"genus"] = tgn
seq[b"family"] = tfa
seq['species_name'] = sp_sn
seq['genus_name'] = gn_sn
seq['family_name'] = fa_sn
seq[b"species_name"] = sp_sn
seq[b"genus_name"] = gn_sn
seq[b"family_name"] = fa_sn
seq['rank'] = taxonomy.get_rank(taxid)
seq['scientific_name'] = taxonomy.get_scientific_name(taxid)
seq[b"rank"] = taxonomy.get_rank(taxid)
seq[b"scientific_name"] = taxonomy.get_scientific_name(taxid)
cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, list mergedKeys_list=None, Taxonomy taxonomy=None, bint mergeIds=False, list categories=None) :
cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, list mergedKeys_list=None, Taxonomy taxonomy=None, bint mergeIds=False, list categories=None, int max_elts=10000) :
cdef int i
cdef int k
cdef int k_count
cdef int o_idx
cdef int u_idx
cdef tuple u_id
cdef int i_idx
cdef int i_count
cdef set mergedKeys
cdef str key_str
cdef bytes key
cdef bytes mkey
cdef bytes merged_col_name
cdef bytes o_id
cdef bytes i_id
cdef set mergedKeys_set
cdef tuple unique_id
cdef list catl
cdef list mergedKeys
cdef list mergedKeys_list_b
cdef list mergedKeys_m
cdef list str_merged_cols
cdef list merged_sequences
cdef dict uniques
cdef dict merged_infos
cdef object iter_view
cdef dict mkey_infos
cdef dict merged_dict
cdef dict mkey_cols
cdef Line i_seq
cdef Line o_seq
cdef str key
cdef bytes key_b
cdef str mkey
cdef str merged_col_name
cdef Line u_seq
cdef Column i_col
cdef Column seq_col
cdef object to_merge
cdef Column_line mcol
cdef Column i_seq_col
cdef Column i_id_col
cdef Column i_taxid_col
cdef Column i_taxid_dist_col
cdef Column o_id_col
cdef Column o_taxid_dist_col
cdef Column o_merged_col
cdef Column_line i_mcol
cdef list catl
cdef dict merged_ids_dict
cdef int nb_ids_max
cdef set ids_list
cdef bytes k
cdef int n
cdef dict taxid_dist_dict
cdef object taxid_dist_dict
cdef object iter_view
cdef object mcol
cdef object to_merge
#print(categories)
uniques = {}
mergedKeys_list_b = []
if mergedKeys_list is not None:
mergedKeys=set(mergedKeys_list)
for key_str in mergedKeys_list:
mergedKeys_list_b.append(tobytes(key_str))
mergedKeys_set=set(mergedKeys_list_b)
else:
mergedKeys=set()
mergedKeys_set=set()
if taxonomy is not None:
mergedKeys.add('taxid')
mergedKeys_set.add(b"taxid")
mergedKeys = list(mergedKeys_set)
k_count = len(mergedKeys)
mergedKeys_m = []
for k in range(k_count):
mergedKeys_m.append(b"merged_" + mergedKeys[k])
if categories is None:
categories = []
# Going through columns to merge a first time to create merged columns with the good number of elements per line and elemnts names
# Keep columns that are going to be used a lot in variables
i_seq_col = view[NUC_SEQUENCE_COLUMN]
i_id_col = view[ID_COLUMN]
i_taxid_col = view[b"taxid"]
if b"taxid_dist" in view:
i_taxid_dist_col = view[b"taxid_dist"]
# First browsing
i = 0
o_idx = 0
logger("info", "First browsing through the input")
merged_infos = {}
i = 0
iter_view = iter(view)
for i_seq in iter_view:
for i_seq in iter_view :
pb(i)
for key in mergedKeys:
mkey = "merged_%s" % key
# This can't be done in the same line as the unique_id tuple creation because it generates a bug
# where Cython (version 0.25.2) does not detect the reference to the categs_list variable and deallocates
# it at the beginning of the function.
# (Only happens if categs_list is an optional parameter, which it is).
catl = []
for x in categories :
catl.append(i_seq[x])
unique_id = tuple(catl) + (i_seq_col.get_line_idx(i),)
#unique_id = tuple(i_seq[x] for x in categories) + (seq_col.get_line_idx(i),) # The line that cython can't read properly
if unique_id in uniques:
uniques[unique_id].append(i)
else:
uniques[unique_id] = [i]
for k in range(k_count):
key = mergedKeys[k]
mkey = mergedKeys_m[k]
if key in i_seq: # TODO what if mkey already in i_seq?
if mkey not in merged_infos:
merged_infos[mkey] = {}
@ -234,166 +289,183 @@ cdef uniq_sequences(View_NUC_SEQS view, View_NUC_SEQS o_view, ProgressBar pb, li
mkey_infos['nb_elts'] += 1
i+=1
for key in mergedKeys:
merged_col_name = "merged_%s" % key
# Create merged columns
str_merged_cols = []
mkey_cols = {}
for k in range(k_count):
key = mergedKeys[k]
merged_col_name = mergedKeys_m[k]
i_col = view[key]
Column.new_column(o_view,
merged_col_name,
OBI_INT,
nb_elements_per_line=merged_infos[merged_col_name]['nb_elts'],
elements_names=merged_infos[merged_col_name]['elt_names'],
comments=i_col.comments,
alias=merged_col_name # TODO what if it already exists
)
if merged_infos[merged_col_name]['nb_elts'] > max_elts:
str_merged_cols.append(merged_col_name)
Column.new_column(o_view,
merged_col_name,
OBI_STR,
to_eval=True,
comments=i_col.comments,
alias=merged_col_name # TODO what if it already exists
)
else:
Column.new_column(o_view,
merged_col_name,
OBI_INT,
nb_elements_per_line=merged_infos[merged_col_name]['nb_elts'],
elements_names=list(merged_infos[merged_col_name]['elt_names']),
comments=i_col.comments,
alias=merged_col_name # TODO what if it already exists
)
mkey_cols[merged_col_name] = o_view[merged_col_name]
# taxid_dist column
if mergeIds and b"taxid" in mergedKeys:
if len(view) > max_elts: #The number of different IDs corresponds to the number of sequences in the view
str_merged_cols.append(b"taxid_dist")
Column.new_column(o_view,
b"taxid_dist",
OBI_STR,
to_eval=True,
comments=b"obi uniq taxid dist, stored as character strings to be read as dict",
alias=b"taxid_dist" # TODO what if it already exists
)
else:
Column.new_column(o_view,
b"taxid_dist",
OBI_INT,
nb_elements_per_line=len(view),
elements_names=[id for id in i_id_col],
comments=b"obi uniq taxid dist",
alias=b"taxid_dist" # TODO what if it already exists
)
del(merged_infos)
# Merged ids column
if mergeIds :
Column.new_column(o_view,
b"merged",
OBI_STR,
tuples=True,
comments=b"obi uniq merged ids",
alias=b"merged" # TODO what if it already exists
)
# Keep columns that are going to be used a lot in variables
o_id_col = o_view[ID_COLUMN]
if b"taxid_dist" in o_view:
o_taxid_dist_col = o_view[b"taxid_dist"]
if b"merged" in o_view:
o_merged_col = o_view[b"merged"]
logger("info", "Second browsing through the input")
merged_ids_dict = {}
taxid_dist_dict = {}
i = 0
# Initialize the progress bar
pb = ProgressBar(len(uniques), seconde=5)
o_idx = 0
seq_col = view[NUC_SEQUENCE_COLUMN]
iter_view = iter(view)
for i_seq in iter_view :
pb(i)
for unique_id in uniques :
pb(o_idx)
# This can't be done in the same line as the u_id tuple creation because it generates a bug
# where Cython (version 0.25.2) does not detect the reference to the categs_list variable and deallocates
# it at the beginning of the function.
# (Only happens if categs_list is an optional parameter, which it is).
catl = []
for x in categories :
catl.append(i_seq[x])
merged_sequences = uniques[unique_id]
u_id = tuple(catl) + (seq_col.get_line_idx(i),)
#u_id = tuple(i_seq[x] for x in categories) + (seq_col.get_line_idx(i),) # The line that cython can't read properly
u_idx = uniques[unique_id][0]
u_seq = view[u_idx]
o_view[o_idx] = u_seq
o_seq = o_view[o_idx]
o_id = o_seq.id
if u_id in uniques:
if mergeIds:
o_merged_col[o_idx] = [view[idx].id for idx in merged_sequences]
if COUNT_COLUMN not in o_seq or o_seq[COUNT_COLUMN] is None:
o_seq[COUNT_COLUMN] = 1
if b"taxid_dist" in u_seq and i_taxid_dist_col[u_idx] is not None:
taxid_dist_dict = i_taxid_dist_col[u_idx]
else:
taxid_dist_dict = {}
for i_idx in merged_sequences:
i_id = i_id_col[i_idx]
i_seq = view[i_idx]
if COUNT_COLUMN not in i_seq or i_seq[COUNT_COLUMN] is None:
i_count = 1
else:
i_count = i_seq[COUNT_COLUMN]
u_idx = uniques[u_id]
o_seq = o_view[u_idx]
o_seq[COUNT_COLUMN] += i_count
for key in mergedKeys:
if key=='taxid' and mergeIds:
if 'taxid_dist' in i_seq:
taxid_dist_dict[o_seq.id].update(i_seq["taxid_dist"])
if 'taxid' in i_seq:
taxid_dist_dict[o_seq.id][i_seq.id] = i_seq['taxid']
merged_dict = {}
for mkey in mergedKeys_m:
merged_dict[mkey] = {}
for k in range(k_count):
key = mergedKeys[k]
mkey = mergedKeys_m[k]
if key==b"taxid" and mergeIds:
if b"taxid_dist" in i_seq:
taxid_dist_dict.update(i_taxid_dist_col[i_idx])
if b"taxid" in i_seq:
taxid_dist_dict[i_id] = i_taxid_col[i_idx]
mkey = "merged_%s" % key
#cas ou on met a jour les merged_keys mais il n'y a pas de merged_keys dans la sequence qui arrive
if key in i_seq:
to_merge = i_seq[key]
if type(to_merge) != bytes :
to_merge = str(to_merge)
mcol = o_seq[mkey]
if mcol[to_merge] is None:
mcol[to_merge] = i_count
else:
mcol[to_merge] = mcol[to_merge] + i_count
if to_merge is not None:
if type(to_merge) != bytes :
to_merge = tobytes(str(to_merge))
mcol = merged_dict[mkey]
if to_merge not in mcol or mcol[to_merge] is None:
mcol[to_merge] = i_count
else:
mcol[to_merge] = mcol[to_merge] + i_count
o_seq[key] = None
#cas ou merged_keys existe deja
else: # TODO is this a good else
if mkey in i_seq:
mcol = o_seq[mkey]
mcol = merged_dict[mkey]
i_mcol = i_seq[mkey]
for key_b in i_mcol:
if mcol[key_b] is None:
mcol[key_b] = i_mcol[key_b]
else:
mcol[key_b] = mcol[key_b] + i_mcol[key_b]
if i_mcol is not None:
for key2 in i_mcol:
if mcol[key2] is None:
mcol[key2] = i_mcol[key2]
else:
mcol[key2] = mcol[key2] + i_mcol[key2]
for key_b in i_seq.keys():
# Merger proprement l'attribut merged s'il existe
# Write taxid_dist
if mergeIds and b"taxid" in mergedKeys:
if b"taxid_dist" in str_merged_cols:
o_taxid_dist_col[o_idx] = str(taxid_dist_dict)
else:
o_taxid_dist_col[o_idx] = taxid_dist_dict
# Write merged dicts
for mkey in merged_dict:
if mkey in str_merged_cols:
mkey_cols[mkey][o_idx] = str(merged_dict[mkey])
else:
mkey_cols[mkey][o_idx] = merged_dict[mkey]
for key in i_seq.keys():
# Delete informations that differ between the merged sequences
# TODO make special columns list?
if key_b != COUNT_COLUMN and key_b != ID_COLUMN and key_b != NUC_SEQUENCE_COLUMN and key_b in o_seq and o_seq[key_b] != i_seq[key_b] :
o_seq[key_b] = None
if key != COUNT_COLUMN and key != ID_COLUMN and key != NUC_SEQUENCE_COLUMN and key in o_seq and o_seq[key] != i_seq[key] :
o_seq[key] = None
if mergeIds :
merged_ids_dict[o_seq.id].append(i_seq.id)
else:
o_view[o_idx] = i_seq
o_seq = o_view[o_idx]
uniques[u_id] = o_idx
o_idx += 1
if COUNT_COLUMN not in o_seq or o_seq[COUNT_COLUMN] is None:
o_seq[COUNT_COLUMN] = 1
for key in mergedKeys:
if key=='taxid' and mergeIds:
if 'taxid' in o_seq and 'taxid_dist' not in o_seq:
taxid_dist_dict[i_seq.id] = {}
if 'taxid' in o_seq:
taxid_dist_dict[i_seq.id][o_seq.id] = o_seq['taxid']
mkey = "merged_%s" % key
if key in o_seq:
to_merge = o_seq[key]
if type(to_merge) != bytes :
to_merge = str(to_merge)
mcol = o_seq[mkey]
if to_merge in mcol and mcol[to_merge] is not None:
mcol[to_merge] = mcol[to_merge] + o_seq[COUNT_COLUMN]
else:
mcol[to_merge] = o_seq[COUNT_COLUMN]
o_seq[key] = None # TODO delete column eventually -> make C function that deletes 'empty' columns
if mergeIds:
merged_ids_dict[o_seq.id] = [o_seq.id]
i+=1
# Merged ids column
if mergeIds :
logger("info", "Building merged ids column")
nb_ids_max = 0
ids_list = set()
for k in merged_ids_dict :
ids_list = set(list(ids_list) + merged_ids_dict[k])
n = len(merged_ids_dict[k])
if n > nb_ids_max :
nb_ids_max = n
Column.new_column(o_view,
"merged",
OBI_STR,
nb_elements_per_line=nb_ids_max,
elements_names=None,
comments="obi uniq merged ids",
alias="merged" # TODO what if it already exists
)
for o_seq in o_view:
o_seq['merged'] = merged_ids_dict[o_seq.id]
if mergeIds and 'taxid' in mergedKeys:
logger("info", "Building taxid dist column")
Column.new_column(o_view,
"taxid_dist",
OBI_INT,
nb_elements_per_line=len(ids_list),
elements_names=list(ids_list),
comments="obi uniq taxid dist",
alias="taxid_dist" # TODO what if it already exists
)
for o_seq in o_view:
if o_seq.id in taxid_dist_dict :
o_seq['taxid_dist'] = taxid_dist_dict[o_seq.id]
o_idx += 1
if taxonomy is not None:
logger("info", "Merging taxonomy classification")
merge_taxonomy_classification(o_view, taxonomy)
def run(config):
cdef tuple input
@ -417,12 +489,16 @@ def run(config):
input=False,
newviewtype=View_NUC_SEQS)
# TODO exceptions not handled like they should be
if output is None:
raise Exception("Could not create output view")
entries = input[1]
o_view = output[1]
if 'taxoURI' in config['obi'] : # TODO default None problem
taxo_uri = open_uri(config['obi']['taxoURI'])
if taxo_uri is None:
raise RollbackException("Couldn't open taxonomy, rollbacking view", o_view)
taxo = taxo_uri[1]
else :
taxo = None
@ -431,9 +507,9 @@ def run(config):
pb = ProgressBar(len(entries), config, seconde=5)
try:
uniq_sequences(entries, o_view, pb, mergedKeys_list=config['uniq']['merge'], taxonomy=taxo, mergeIds=config['uniq']['mergeids'], categories=config['uniq']['categories'])
except:
raise RollbackException("obi uniq error, rollbacking view", o_view)
uniq_sequences(entries, o_view, pb, mergedKeys_list=config['uniq']['merge'], taxonomy=taxo, mergeIds=config['uniq']['mergeids'], categories=config['uniq']['categories'], max_elts=config['obi']['maxelts'])
except Exception, e:
raise RollbackException("obi uniq error, rollbacking view: "+str(e), o_view)
print("\n")
print(repr(o_view))