diff --git a/python/obitools3/commands/clean.pyx b/python/obitools3/commands/clean.pyx index b69fb23..f908294 100755 --- a/python/obitools3/commands/clean.pyx +++ b/python/obitools3/commands/clean.pyx @@ -59,6 +59,13 @@ def addOptions(parser): default=False, help="Adds tags for each sequence giving its cluster's head and weight for each sample.") + group.add_argument('--thread-count','-p', # TODO should probably be in a specific option group + action="store", dest="clean:thread-count", + metavar='', + default=-1, + type=int, + help="Number of threads to use for the computation. Default: the maximum available.") + def run(config): @@ -101,7 +108,7 @@ def run(config): comments = View.print_config(config, "clean", command_line, input_dms_name=[i_dms_name], input_view_name=[i_view_name]) if obi_clean(tobytes(i_dms_name), tobytes(i_view_name), tobytes(config['clean']['sample-tag-name']), tobytes(o_view_name), comments, \ - config['clean']['distance'], config['clean']['ratio'], config['clean']['heads-only'], -1) < 0: + config['clean']['distance'], config['clean']['ratio'], config['clean']['heads-only'], config['clean']['thread-count']) < 0: raise Exception("Error running obiclean") # If the input and output DMS are not the same, export result view to output DMS diff --git a/src/obi_clean.c b/src/obi_clean.c index 2eecab8..efc044f 100755 --- a/src/obi_clean.c +++ b/src/obi_clean.c @@ -160,6 +160,7 @@ int obi_clean(const char* dms_name, { char* o_view_name_temp = NULL; float p; + index_t i; index_t l; index_t k; index_t seq_count; @@ -173,6 +174,7 @@ int obi_clean(const char* dms_name, int ind_sample_count; char status; int samp; + Obi_blob_p blob1; byte_t* alignment_result_array = NULL; @@ -201,9 +203,9 @@ int obi_clean(const char* dms_name, #ifdef _OPENMP max_threads = omp_get_max_threads(); - if ((thread_count == -1) || (thread_count > max_threads)) // TODO doc + if ((thread_count == -1) || (thread_count > max_threads)) thread_count = max_threads; - omp_set_num_threads(4); + omp_set_num_threads(thread_count); fprintf(stderr, "Running on %d thread(s)\n", thread_count); #endif @@ -361,7 +363,6 @@ int obi_clean(const char* dms_name, // Allocate alignment result array (byte at 0 if not aligned yet, // 1 if sequence at index has a similarity above the threshold with the current sequence, // 2 if sequence at index has a similarity below the threshold with the current sequence) - //alignment_result_array = (byte_t*) calloc(thread_count*seq_count, sizeof(byte_t)); alignment_result_array = (byte_t*) calloc(seq_count, sizeof(byte_t)); if (alignment_result_array == NULL) { @@ -386,171 +387,147 @@ int obi_clean(const char* dms_name, } } - Obi_blob_p blob1; - index_t i; - byte_t* ali_result_array = alignment_result_array; + for (i=0; i< (seq_count-1); i++) + { - - for (i=0; i< (seq_count-1); i++) + if (i%1000 == 0) { + p = (i/(float)seq_count)*100; + fprintf(stderr,"\rDone : %f %% ",p); + } - if (i%1000 == 0) - { - p = (i/(float)(seq_count/(float)thread_count))*100; - fprintf(stderr,"\rDone : %f %% ",p); - } + // Get first sequence + blob1 = blob_array[i]; +// blob1 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0); // slower + if (blob1 == NULL) + { + obidebug(1, "\nError retrieving sequences to align"); + stop = true; + } - // Get first sequence - blob1 = blob_array[i]; - // blob1 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0); // slower - if (blob1 == NULL) - { - obidebug(1, "\nError retrieving sequences to align"); - stop = true; - } + #pragma omp parallel default(none) \ + shared(thread_count, seq_count, blob_array, complete_sample_count_array, alignment_result_array, \ + stop, blob1, i, obi_errno, stderr, max_ratio, iseq_column, i_view, \ + similarity_mode, reference, normalize, threshold, ktable, status_column, o_view, sample_count) + { + index_t j; + Obi_blob_p blob2; + int s1_count; + int s2_count; + int* sample_count_array; + int sample; + byte_t no; + byte_t yes; + double score; + int lcs_min; + bool above_threshold; + int lcs_length; + int ali_length; + byte_t ali_result; -#pragma omp parallel default(none) \ - shared(ali_result_array, thread_count, seq_count, blob_array, complete_sample_count_array, alignment_result_array, stop, blob1, i, \ - obi_errno, stderr, max_ratio, iseq_column, i_view, similarity_mode, reference, normalize, threshold, ktable, status_column, o_view, sample_count) - //private(ali_result_array, thread_id, i, j, p, blob1, blob2, s1_count, s2_count, \ - sample_count_array, sample, yes, no, above_threshold, ali_length, lcs_length, ali_result, score) - -{ - - //byte_t* ali_result_array = NULL; - int thread_id = 0; - index_t j; - float p; - Obi_blob_p blob2; - int s1_count; - int s2_count; - int* sample_count_array; - int sample; - byte_t no; - byte_t yes; - double score; - int lcs_min; - bool above_threshold; - int lcs_length; - int ali_length; - byte_t ali_result; - -// #ifdef _OPENMP -// thread_id = omp_get_thread_num(); -// ali_result_array = alignment_result_array+thread_id; -// #else -// ali_result_array = alignment_result_array; -// #endif - - #pragma omp for schedule(dynamic, sample_count/thread_count) + // Parallelize the loop on samples to avoid interdependency issues inside one sample + #pragma omp for schedule(dynamic, (sample_count/thread_count)) for (sample=0; sample < sample_count; sample++) { - sample_count_array = complete_sample_count_array+(sample*seq_count); // Get count for this sample s1_count = sample_count_array[i]; //s1_count = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, i, sample); // slower - - for (j=i+1; j < seq_count; j++) + for (j=i+1; j < seq_count; j++) + { + // Get second sequence + blob2 = blob_array[j]; + // blob2 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, j, 0); // slower + if (blob2 == NULL) { - - //fprintf(stderr, "\nthread=%d, i=%d, sample=%d, j=%d", omp_get_thread_num(),i,sample,j); - // Get second sequence - blob2 = blob_array[j]; - // blob2 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, j, 0); // slower - if (blob2 == NULL) - { - obidebug(1, "\nError retrieving sequences to align"); - stop = true; - } - - // Get count for this sample - s2_count = sample_count_array[j]; - //s2_count = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, j, sample); // slower - - // Check all ratios - if (((s1_count!=OBIInt_NA && s2_count!=OBIInt_NA) && (s1_count>0 && s2_count>0)) && - ((((s1_count >= s2_count) && (((double) s2_count / (double) s1_count) <= max_ratio))) || - (((s2_count >= s1_count) && (((double) s1_count / (double) s2_count) <= max_ratio))))) - { - - yes = 0; - no = 0; - above_threshold = false; - ali_result = ali_result_array[j]; - if (ali_result > 0) // already aligned - { - if (ali_result == 2) - no = 1; - else if (ali_result == 1) - yes = 1; - } - else // never compared before - { - // Check if the sequences are identical in a quick way (same index in the same indexer) - if (obi_get_index_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0) == obi_get_index_with_elt_idx_and_col_p_in_view(i_view, iseq_column, j, 0)) - above_threshold = true; - else // the sequences aren't identical - { - // kmer filter - align_filters(ktable, blob1, blob2, i, j, threshold, normalize, reference, similarity_mode, &score, &lcs_min, false); - - // Compute alignment score if filter passed - if (score == -1.0) - score = obiblob_sse_banded_lcs_align(blob1, blob2, threshold, normalize, reference, similarity_mode, &lcs_length, &ali_length); - - above_threshold = ((score >= 0) && (score <= threshold)); - } - } - - if (yes || above_threshold) - { - if (yes == 0) - // Set ali result as above the threshold (value 1) - ali_result_array[j] = 1; - - // Might be worth having arrays to read values too for some datasets but unlikely - // label as head or internal - #pragma omp critical - { - if (s1_count >= s2_count) - { - if (obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample) == 's') // seq can become head ONLY if it's a singleton - { - if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 'h') < 0) - stop = true; - } - // Otherwise it's an internal (do nothing) - // Label other sequence as internal no matter what - if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample, 'i') < 0) - stop = true; - } - else // Same thing but with sequences switched - { - if (obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample) == 's') // seq can become head ONLY if it's a singleton - { - if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample, 'h') < 0) - stop = true; - } - if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 'i') < 0) - stop = true; - } - } - } - - else if (no == 0) - // Set ali result as above the threshold (value 2) - ali_result_array[j] = 2; - } + obidebug(1, "\nError retrieving sequences to align"); + stop = true; } + // Get count for this sample + s2_count = sample_count_array[j]; + //s2_count = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, j, sample); // slower + + // Check all ratios + if (((s1_count!=OBIInt_NA && s2_count!=OBIInt_NA) && (s1_count>0 && s2_count>0)) && + ((((s1_count >= s2_count) && (((double) s2_count / (double) s1_count) <= max_ratio))) || + (((s2_count >= s1_count) && (((double) s1_count / (double) s2_count) <= max_ratio))))) + { + + yes = 0; + no = 0; + above_threshold = false; + ali_result = alignment_result_array[j]; + if (ali_result > 0) // already aligned + { + if (ali_result == 2) + no = 1; + else if (ali_result == 1) + yes = 1; + } + else // never compared before + { + // Check if the sequences are identical in a quick way (same index in the same indexer) + if (obi_get_index_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0) == obi_get_index_with_elt_idx_and_col_p_in_view(i_view, iseq_column, j, 0)) + above_threshold = true; + else // the sequences aren't identical + { + // kmer filter + align_filters(ktable, blob1, blob2, i, j, threshold, normalize, reference, similarity_mode, &score, &lcs_min, false); + + // Compute alignment score if filter passed + if (score == -1.0) + score = obiblob_sse_banded_lcs_align(blob1, blob2, threshold, normalize, reference, similarity_mode, &lcs_length, &ali_length); + + above_threshold = ((score >= 0) && (score <= threshold)); + } + } + + if (yes || above_threshold) + { + if (yes == 0) + // Set ali result as above the threshold (value 1) + alignment_result_array[j] = 1; + + // Might be worth having arrays to read values too for some datasets but unlikely + // label as head or internal + + if (s1_count >= s2_count) + { + if (obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample) == 's') // seq can become head ONLY if it's a singleton + { + if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 'h') < 0) + stop = true; + } + // Otherwise it's an internal (do nothing) + // Label other sequence as internal no matter what + if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample, 'i') < 0) + stop = true; + } + else // Same thing but with sequences switched + { + if (obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample) == 's') // seq can become head ONLY if it's a singleton + { + if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample, 'h') < 0) + stop = true; + } + if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 'i') < 0) + stop = true; + } + + } + + else if (no == 0) + // Set ali result as above the threshold (value 2) + alignment_result_array[j] = 2; + } + } } // Reset ali result array to 0 - memset(ali_result_array, 0, seq_count); - + memset(alignment_result_array, 0, seq_count); } } diff --git a/src/obi_clean.h b/src/obi_clean.h index 4be87e0..35011c8 100755 --- a/src/obi_clean.h +++ b/src/obi_clean.h @@ -61,7 +61,8 @@ * @param max_ratio Maximum ratio between the counts of two sequences so that the less abundant one can be considered * as a variant of the more abundant one. * @param heads_only If true, only cluster heads are printed to the output view. - * @param thread_count Number of threads to use (Not available yet) TODO + * @param thread_count Number of threads to use. If the number given is -1 or is greater than the maximum number of + * threads available, the maximum number of threads is detected and used. * * @returns A value indicating the success of the operation. * @retval 0 if the operation was successfully completed.