obiclean parallelized

This commit is contained in:
Celine Mercier
2019-06-20 19:44:04 +02:00
parent 9b4c3537f9
commit 3d9f0352ff
3 changed files with 136 additions and 151 deletions

View File

@ -59,6 +59,13 @@ def addOptions(parser):
default=False, default=False,
help="Adds tags for each sequence giving its cluster's head and weight for each sample.") help="Adds tags for each sequence giving its cluster's head and weight for each sample.")
group.add_argument('--thread-count','-p', # TODO should probably be in a specific option group
action="store", dest="clean:thread-count",
metavar='<THREAD COUNT>',
default=-1,
type=int,
help="Number of threads to use for the computation. Default: the maximum available.")
def run(config): def run(config):
@ -101,7 +108,7 @@ def run(config):
comments = View.print_config(config, "clean", command_line, input_dms_name=[i_dms_name], input_view_name=[i_view_name]) comments = View.print_config(config, "clean", command_line, input_dms_name=[i_dms_name], input_view_name=[i_view_name])
if obi_clean(tobytes(i_dms_name), tobytes(i_view_name), tobytes(config['clean']['sample-tag-name']), tobytes(o_view_name), comments, \ if obi_clean(tobytes(i_dms_name), tobytes(i_view_name), tobytes(config['clean']['sample-tag-name']), tobytes(o_view_name), comments, \
config['clean']['distance'], config['clean']['ratio'], config['clean']['heads-only'], -1) < 0: config['clean']['distance'], config['clean']['ratio'], config['clean']['heads-only'], config['clean']['thread-count']) < 0:
raise Exception("Error running obiclean") raise Exception("Error running obiclean")
# If the input and output DMS are not the same, export result view to output DMS # If the input and output DMS are not the same, export result view to output DMS

View File

@ -160,6 +160,7 @@ int obi_clean(const char* dms_name,
{ {
char* o_view_name_temp = NULL; char* o_view_name_temp = NULL;
float p; float p;
index_t i;
index_t l; index_t l;
index_t k; index_t k;
index_t seq_count; index_t seq_count;
@ -173,6 +174,7 @@ int obi_clean(const char* dms_name,
int ind_sample_count; int ind_sample_count;
char status; char status;
int samp; int samp;
Obi_blob_p blob1;
byte_t* alignment_result_array = NULL; byte_t* alignment_result_array = NULL;
@ -201,9 +203,9 @@ int obi_clean(const char* dms_name,
#ifdef _OPENMP #ifdef _OPENMP
max_threads = omp_get_max_threads(); max_threads = omp_get_max_threads();
if ((thread_count == -1) || (thread_count > max_threads)) // TODO doc if ((thread_count == -1) || (thread_count > max_threads))
thread_count = max_threads; thread_count = max_threads;
omp_set_num_threads(4); omp_set_num_threads(thread_count);
fprintf(stderr, "Running on %d thread(s)\n", thread_count); fprintf(stderr, "Running on %d thread(s)\n", thread_count);
#endif #endif
@ -361,7 +363,6 @@ int obi_clean(const char* dms_name,
// Allocate alignment result array (byte at 0 if not aligned yet, // Allocate alignment result array (byte at 0 if not aligned yet,
// 1 if sequence at index has a similarity above the threshold with the current sequence, // 1 if sequence at index has a similarity above the threshold with the current sequence,
// 2 if sequence at index has a similarity below the threshold with the current sequence) // 2 if sequence at index has a similarity below the threshold with the current sequence)
//alignment_result_array = (byte_t*) calloc(thread_count*seq_count, sizeof(byte_t));
alignment_result_array = (byte_t*) calloc(seq_count, sizeof(byte_t)); alignment_result_array = (byte_t*) calloc(seq_count, sizeof(byte_t));
if (alignment_result_array == NULL) if (alignment_result_array == NULL)
{ {
@ -386,41 +387,30 @@ int obi_clean(const char* dms_name,
} }
} }
Obi_blob_p blob1;
index_t i;
byte_t* ali_result_array = alignment_result_array;
for (i=0; i< (seq_count-1); i++) for (i=0; i< (seq_count-1); i++)
{ {
if (i%1000 == 0) if (i%1000 == 0)
{ {
p = (i/(float)(seq_count/(float)thread_count))*100; p = (i/(float)seq_count)*100;
fprintf(stderr,"\rDone : %f %% ",p); fprintf(stderr,"\rDone : %f %% ",p);
} }
// Get first sequence // Get first sequence
blob1 = blob_array[i]; blob1 = blob_array[i];
// blob1 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0); // slower // blob1 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0); // slower
if (blob1 == NULL) if (blob1 == NULL)
{ {
obidebug(1, "\nError retrieving sequences to align"); obidebug(1, "\nError retrieving sequences to align");
stop = true; stop = true;
} }
#pragma omp parallel default(none) \ #pragma omp parallel default(none) \
shared(ali_result_array, thread_count, seq_count, blob_array, complete_sample_count_array, alignment_result_array, stop, blob1, i, \ shared(thread_count, seq_count, blob_array, complete_sample_count_array, alignment_result_array, \
obi_errno, stderr, max_ratio, iseq_column, i_view, similarity_mode, reference, normalize, threshold, ktable, status_column, o_view, sample_count) stop, blob1, i, obi_errno, stderr, max_ratio, iseq_column, i_view, \
//private(ali_result_array, thread_id, i, j, p, blob1, blob2, s1_count, s2_count, \ similarity_mode, reference, normalize, threshold, ktable, status_column, o_view, sample_count)
sample_count_array, sample, yes, no, above_threshold, ali_length, lcs_length, ali_result, score) {
{
//byte_t* ali_result_array = NULL;
int thread_id = 0;
index_t j; index_t j;
float p;
Obi_blob_p blob2; Obi_blob_p blob2;
int s1_count; int s1_count;
int s2_count; int s2_count;
@ -435,28 +425,18 @@ int obi_clean(const char* dms_name,
int ali_length; int ali_length;
byte_t ali_result; byte_t ali_result;
// #ifdef _OPENMP // Parallelize the loop on samples to avoid interdependency issues inside one sample
// thread_id = omp_get_thread_num(); #pragma omp for schedule(dynamic, (sample_count/thread_count))
// ali_result_array = alignment_result_array+thread_id;
// #else
// ali_result_array = alignment_result_array;
// #endif
#pragma omp for schedule(dynamic, sample_count/thread_count)
for (sample=0; sample < sample_count; sample++) for (sample=0; sample < sample_count; sample++)
{ {
sample_count_array = complete_sample_count_array+(sample*seq_count); sample_count_array = complete_sample_count_array+(sample*seq_count);
// Get count for this sample // Get count for this sample
s1_count = sample_count_array[i]; s1_count = sample_count_array[i];
//s1_count = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, i, sample); // slower //s1_count = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, i, sample); // slower
for (j=i+1; j < seq_count; j++) for (j=i+1; j < seq_count; j++)
{ {
//fprintf(stderr, "\nthread=%d, i=%d, sample=%d, j=%d", omp_get_thread_num(),i,sample,j);
// Get second sequence // Get second sequence
blob2 = blob_array[j]; blob2 = blob_array[j];
// blob2 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, j, 0); // slower // blob2 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, j, 0); // slower
@ -479,7 +459,7 @@ int obi_clean(const char* dms_name,
yes = 0; yes = 0;
no = 0; no = 0;
above_threshold = false; above_threshold = false;
ali_result = ali_result_array[j]; ali_result = alignment_result_array[j];
if (ali_result > 0) // already aligned if (ali_result > 0) // already aligned
{ {
if (ali_result == 2) if (ali_result == 2)
@ -509,12 +489,11 @@ int obi_clean(const char* dms_name,
{ {
if (yes == 0) if (yes == 0)
// Set ali result as above the threshold (value 1) // Set ali result as above the threshold (value 1)
ali_result_array[j] = 1; alignment_result_array[j] = 1;
// Might be worth having arrays to read values too for some datasets but unlikely // Might be worth having arrays to read values too for some datasets but unlikely
// label as head or internal // label as head or internal
#pragma omp critical
{
if (s1_count >= s2_count) if (s1_count >= s2_count)
{ {
if (obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample) == 's') // seq can become head ONLY if it's a singleton if (obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample) == 's') // seq can become head ONLY if it's a singleton
@ -537,20 +516,18 @@ int obi_clean(const char* dms_name,
if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 'i') < 0) if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 'i') < 0)
stop = true; stop = true;
} }
}
} }
else if (no == 0) else if (no == 0)
// Set ali result as above the threshold (value 2) // Set ali result as above the threshold (value 2)
ali_result_array[j] = 2; alignment_result_array[j] = 2;
} }
} }
} }
// Reset ali result array to 0 // Reset ali result array to 0
memset(ali_result_array, 0, seq_count); memset(alignment_result_array, 0, seq_count);
} }
} }

View File

@ -61,7 +61,8 @@
* @param max_ratio Maximum ratio between the counts of two sequences so that the less abundant one can be considered * @param max_ratio Maximum ratio between the counts of two sequences so that the less abundant one can be considered
* as a variant of the more abundant one. * as a variant of the more abundant one.
* @param heads_only If true, only cluster heads are printed to the output view. * @param heads_only If true, only cluster heads are printed to the output view.
* @param thread_count Number of threads to use (Not available yet) TODO * @param thread_count Number of threads to use. If the number given is -1 or is greater than the maximum number of
* threads available, the maximum number of threads is detected and used.
* *
* @returns A value indicating the success of the operation. * @returns A value indicating the success of the operation.
* @retval 0 if the operation was successfully completed. * @retval 0 if the operation was successfully completed.