multithreaded obiclean working but not cleaned

This commit is contained in:
Celine Mercier
2019-06-19 17:29:58 +02:00
parent fd0b7a9177
commit 9b4c3537f9

View File

@ -160,33 +160,23 @@ int obi_clean(const char* dms_name,
{ {
char* o_view_name_temp = NULL; char* o_view_name_temp = NULL;
float p; float p;
index_t i, j, l; index_t l;
index_t k;
index_t seq_count; index_t seq_count;
index_t* line_selection = NULL; index_t* line_selection = NULL;
double score;
bool above_threshold;
int lcs_length;
int ali_length;
Kmer_table_p ktable; Kmer_table_p ktable;
Obi_blob_p blob1;
Obi_blob_p blob2;
int lcs_min;
int sample_count; int sample_count;
int sample;
int s1_count;
int s2_count;
bool head; bool head;
int head_count; int head_count;
int internal_count; int internal_count;
int singleton_count; int singleton_count;
int ind_sample_count; int ind_sample_count;
char status; char status;
int samp;
byte_t* alignment_result_array = NULL; byte_t* alignment_result_array = NULL;
byte_t ali_result;
int* complete_sample_count_array = NULL; int* complete_sample_count_array = NULL;
int* sample_count_array = NULL;
Obi_blob_p* blob_array = NULL; Obi_blob_p* blob_array = NULL;
OBIDMS_p dms = NULL; OBIDMS_p dms = NULL;
@ -201,11 +191,8 @@ int obi_clean(const char* dms_name,
OBIDMS_column_p singletoncount_column = NULL; OBIDMS_column_p singletoncount_column = NULL;
OBIDMS_column_p samplecount_column = NULL; OBIDMS_column_p samplecount_column = NULL;
byte_t no;
byte_t yes;
bool normalize = false; bool normalize = false;
int reference = 0; int reference = 0;
bool similarity_mode = false; bool similarity_mode = false;
bool stop = false; bool stop = false;
@ -352,11 +339,10 @@ int obi_clean(const char* dms_name,
obidebug(1, "\nError allocating memory for the array of sample counts, size: %lld", seq_count * sample_count * sizeof(int)); obidebug(1, "\nError allocating memory for the array of sample counts, size: %lld", seq_count * sample_count * sizeof(int));
return -1; return -1;
} }
for (sample=0; sample < sample_count; sample++) for (samp=0; samp < sample_count; samp++)
{ {
sample_count_array = complete_sample_count_array+(sample*seq_count); for (k=0; k<seq_count; k++)
for (i=0; i<seq_count; i++) complete_sample_count_array[k+(samp*seq_count)] = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, k, samp);
sample_count_array[i] = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, i, sample);
} }
// Allocate arrays for blobs otherwise reading in mapped files takes longer // Allocate arrays for blobs otherwise reading in mapped files takes longer
@ -367,14 +353,15 @@ int obi_clean(const char* dms_name,
obidebug(1, "\nError allocating memory for the array of blobs"); obidebug(1, "\nError allocating memory for the array of blobs");
return -1; return -1;
} }
for (i=0; i<seq_count; i++) for (k=0; k<seq_count; k++)
{ {
blob_array[i] = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0); blob_array[k] = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, k, 0);
} }
// Allocate alignment result array (byte at 0 if not aligned yet, // Allocate alignment result array (byte at 0 if not aligned yet,
// 1 if sequence at index has a similarity above the threshold with the current sequence, // 1 if sequence at index has a similarity above the threshold with the current sequence,
// 2 if sequence at index has a similarity below the threshold with the current sequence) // 2 if sequence at index has a similarity below the threshold with the current sequence)
//alignment_result_array = (byte_t*) calloc(thread_count*seq_count, sizeof(byte_t));
alignment_result_array = (byte_t*) calloc(seq_count, sizeof(byte_t)); alignment_result_array = (byte_t*) calloc(seq_count, sizeof(byte_t));
if (alignment_result_array == NULL) if (alignment_result_array == NULL)
{ {
@ -384,13 +371,13 @@ int obi_clean(const char* dms_name,
} }
// Initialize all sequences to singletons or NA if no sequences in that sample // Initialize all sequences to singletons or NA if no sequences in that sample
for (i=0; i<seq_count; i++) for (k=0; k<seq_count; k++)
{ {
for (sample=0; sample < sample_count; sample++) for (samp=0; samp < sample_count; samp++)
{ {
if (obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, i, sample) != OBIInt_NA) // Only initialize samples where there are some sequences if (obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, k, samp) != OBIInt_NA) // Only initialize samples where there are some sequences
{ {
if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 's') < 0) if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, k, samp, 's') < 0)
{ {
obidebug(1, "\nError initializing all sequences to singletons"); obidebug(1, "\nError initializing all sequences to singletons");
return -1; return -1;
@ -399,131 +386,172 @@ int obi_clean(const char* dms_name,
} }
} }
Obi_blob_p blob1;
index_t i;
byte_t* ali_result_array = alignment_result_array;
for (i=0; i< (seq_count-1); i++)
{ for (i=0; i< (seq_count-1); i++)
if (i%1000 == 0)
{ {
p = (i/(float)seq_count)*100;
fprintf(stderr,"\rDone : %f %% ",p);
}
// Get first sequence if (i%1000 == 0)
blob1 = blob_array[i];
// blob1 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0); // slower
if (blob1 == NULL)
{
obidebug(1, "\nError retrieving sequences to align");
return -1;
}
for (sample=0; sample < sample_count; sample++)
{
sample_count_array = complete_sample_count_array+(sample*seq_count);
// Get count for this sample
s1_count = sample_count_array[i];
//s1_count = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, i, sample); // slower
#pragma omp parallel shared(i, seq_count, s1_count, sample, blob_array, sample_count_array, alignment_result_array, stop) \
private(j, blob2, s2_count, yes, no, above_threshold, ali_result, score)
{ {
#pragma omp for schedule(dynamic, 100) p = (i/(float)(seq_count/(float)thread_count))*100;
for (j=i+1; j < seq_count; j++) fprintf(stderr,"\rDone : %f %% ",p);
{ }
// Get second sequence
blob2 = blob_array[j];
// blob2 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, j, 0); // slower
if (blob2 == NULL)
{
obidebug(1, "\nError retrieving sequences to align");
stop = true;
}
// Get count for this sample // Get first sequence
s2_count = sample_count_array[j]; blob1 = blob_array[i];
//s2_count = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, j, sample); // slower // blob1 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0); // slower
if (blob1 == NULL)
{
obidebug(1, "\nError retrieving sequences to align");
stop = true;
}
// Check all ratios #pragma omp parallel default(none) \
if (((s1_count!=OBIInt_NA && s2_count!=OBIInt_NA) && (s1_count>0 && s2_count>0)) && shared(ali_result_array, thread_count, seq_count, blob_array, complete_sample_count_array, alignment_result_array, stop, blob1, i, \
((((s1_count >= s2_count) && (((double) s2_count / (double) s1_count) <= max_ratio))) || obi_errno, stderr, max_ratio, iseq_column, i_view, similarity_mode, reference, normalize, threshold, ktable, status_column, o_view, sample_count)
(((s2_count >= s1_count) && (((double) s1_count / (double) s2_count) <= max_ratio))))) //private(ali_result_array, thread_id, i, j, p, blob1, blob2, s1_count, s2_count, \
sample_count_array, sample, yes, no, above_threshold, ali_length, lcs_length, ali_result, score)
{
//byte_t* ali_result_array = NULL;
int thread_id = 0;
index_t j;
float p;
Obi_blob_p blob2;
int s1_count;
int s2_count;
int* sample_count_array;
int sample;
byte_t no;
byte_t yes;
double score;
int lcs_min;
bool above_threshold;
int lcs_length;
int ali_length;
byte_t ali_result;
// #ifdef _OPENMP
// thread_id = omp_get_thread_num();
// ali_result_array = alignment_result_array+thread_id;
// #else
// ali_result_array = alignment_result_array;
// #endif
#pragma omp for schedule(dynamic, sample_count/thread_count)
for (sample=0; sample < sample_count; sample++)
{
sample_count_array = complete_sample_count_array+(sample*seq_count);
// Get count for this sample
s1_count = sample_count_array[i];
//s1_count = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, i, sample); // slower
for (j=i+1; j < seq_count; j++)
{ {
yes = 0; //fprintf(stderr, "\nthread=%d, i=%d, sample=%d, j=%d", omp_get_thread_num(),i,sample,j);
no = 0; // Get second sequence
above_threshold = false; blob2 = blob_array[j];
ali_result = alignment_result_array[j]; // blob2 = obi_get_blob_with_elt_idx_and_col_p_in_view(i_view, iseq_column, j, 0); // slower
if (ali_result > 0) // already aligned if (blob2 == NULL)
{ {
if (ali_result == 2) obidebug(1, "\nError retrieving sequences to align");
no = 1; stop = true;
else if (ali_result == 1)
yes = 1;
} }
else // never compared before
// Get count for this sample
s2_count = sample_count_array[j];
//s2_count = obi_get_int_with_elt_idx_and_col_p_in_view(i_view, sample_column, j, sample); // slower
// Check all ratios
if (((s1_count!=OBIInt_NA && s2_count!=OBIInt_NA) && (s1_count>0 && s2_count>0)) &&
((((s1_count >= s2_count) && (((double) s2_count / (double) s1_count) <= max_ratio))) ||
(((s2_count >= s1_count) && (((double) s1_count / (double) s2_count) <= max_ratio)))))
{ {
// Check if the sequences are identical in a quick way (same index in the same indexer)
if (obi_get_index_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0) == obi_get_index_with_elt_idx_and_col_p_in_view(i_view, iseq_column, j, 0)) yes = 0;
above_threshold = true; no = 0;
else // the sequences aren't identical above_threshold = false;
ali_result = ali_result_array[j];
if (ali_result > 0) // already aligned
{ {
// kmer filter if (ali_result == 2)
align_filters(ktable, blob1, blob2, i, j, threshold, normalize, reference, similarity_mode, &score, &lcs_min, false); no = 1;
else if (ali_result == 1)
// Compute alignment score if filter passed yes = 1;
if (score == -1.0)
score = obiblob_sse_banded_lcs_align(blob1, blob2, threshold, normalize, reference, similarity_mode, &lcs_length, &ali_length);
above_threshold = ((score >= 0) && (score <= threshold));
} }
} else // never compared before
if (yes || above_threshold)
{
if (yes == 0)
// Set ali result as above the threshold (value 1)
alignment_result_array[j] = 1;
// Might be worth having arrays to read values too for some datasets but unlikely
// label as head or internal
#pragma omp critical
{ {
if (s1_count >= s2_count) // Check if the sequences are identical in a quick way (same index in the same indexer)
if (obi_get_index_with_elt_idx_and_col_p_in_view(i_view, iseq_column, i, 0) == obi_get_index_with_elt_idx_and_col_p_in_view(i_view, iseq_column, j, 0))
above_threshold = true;
else // the sequences aren't identical
{ {
if (obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample) == 's') // seq can become head ONLY if it's a singleton // kmer filter
{ align_filters(ktable, blob1, blob2, i, j, threshold, normalize, reference, similarity_mode, &score, &lcs_min, false);
if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 'h') < 0)
stop = true; // Compute alignment score if filter passed
} if (score == -1.0)
// Otherwise it's an internal (do nothing) score = obiblob_sse_banded_lcs_align(blob1, blob2, threshold, normalize, reference, similarity_mode, &lcs_length, &ali_length);
// Label other sequence as internal no matter what
if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample, 'i') < 0) above_threshold = ((score >= 0) && (score <= threshold));
stop = true;
} }
else // Same thing but with sequences switched }
if (yes || above_threshold)
{
if (yes == 0)
// Set ali result as above the threshold (value 1)
ali_result_array[j] = 1;
// Might be worth having arrays to read values too for some datasets but unlikely
// label as head or internal
#pragma omp critical
{ {
if (obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample) == 's') // seq can become head ONLY if it's a singleton if (s1_count >= s2_count)
{
if (obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample) == 's') // seq can become head ONLY if it's a singleton
{ {
if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample, 'h') < 0) if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 'h') < 0)
stop = true; stop = true;
} }
if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 'i') < 0) // Otherwise it's an internal (do nothing)
stop = true; // Label other sequence as internal no matter what
if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample, 'i') < 0)
stop = true;
}
else // Same thing but with sequences switched
{
if (obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample) == 's') // seq can become head ONLY if it's a singleton
{
if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, j, sample, 'h') < 0)
stop = true;
}
if (obi_set_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample, 'i') < 0)
stop = true;
}
} }
} }
else if (no == 0)
// Set ali result as above the threshold (value 2)
ali_result_array[j] = 2;
} }
else if (no == 0)
// Set ali result as above the threshold (value 2)
alignment_result_array[j] = 2;
} }
}
} }
if (stop)
return -1; // Reset ali result array to 0
memset(ali_result_array, 0, seq_count);
} }
// Reset ali result array to 0
memset(alignment_result_array, 0, seq_count);
} }
free_kmer_tables(ktable, seq_count); free_kmer_tables(ktable, seq_count);
@ -533,6 +561,9 @@ int obi_clean(const char* dms_name,
fprintf(stderr, "\n"); fprintf(stderr, "\n");
if (stop)
return -1;
if (heads_only) if (heads_only)
{ {
line_selection = malloc((o_view->infos)->line_count * sizeof(index_t)); line_selection = malloc((o_view->infos)->line_count * sizeof(index_t));
@ -545,11 +576,11 @@ int obi_clean(const char* dms_name,
l=0; l=0;
} }
for (i=0; i<seq_count; i++) for (k=0; k<seq_count; k++)
{ {
if (i%1000 == 0) if (k%1000 == 0)
{ {
p = (i/(float)(seq_count))*100; p = (k/(float)(seq_count))*100;
fprintf(stderr, "\rAnnotating : %f %% ",p); fprintf(stderr, "\rAnnotating : %f %% ",p);
} }
@ -559,16 +590,16 @@ int obi_clean(const char* dms_name,
singleton_count = 0; singleton_count = 0;
ind_sample_count = 0; ind_sample_count = 0;
for (sample=0; sample < sample_count; sample++) for (samp=0; samp < sample_count; samp++)
{ {
// Check if head or singleton in at least one sample // Check if head or singleton in at least one sample
status = obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, i, sample); status = obi_get_char_with_elt_idx_and_col_p_in_view(o_view, status_column, k, samp);
if ((!head) && ((status == 'h') || (status == 's'))) if ((!head) && ((status == 'h') || (status == 's')))
{ {
head = true; head = true;
if (heads_only) if (heads_only)
{ {
line_selection[l] = i; line_selection[l] = k;
l++; l++;
} }
} }
@ -593,15 +624,15 @@ int obi_clean(const char* dms_name,
if (!heads_only || (heads_only && head)) // Label only if sequence is going to be kept in final view if (!heads_only || (heads_only && head)) // Label only if sequence is going to be kept in final view
{ {
if (obi_set_bool_with_elt_idx_and_col_p_in_view(o_view, head_column, i, 0, head) < 0) if (obi_set_bool_with_elt_idx_and_col_p_in_view(o_view, head_column, k, 0, head) < 0)
return -1; return -1;
if (obi_set_int_with_elt_idx_and_col_p_in_view(o_view, singletoncount_column, i, 0, singleton_count) < 0) if (obi_set_int_with_elt_idx_and_col_p_in_view(o_view, singletoncount_column, k, 0, singleton_count) < 0)
return -1; return -1;
if (obi_set_int_with_elt_idx_and_col_p_in_view(o_view, internalcount_column, i, 0, internal_count) < 0) if (obi_set_int_with_elt_idx_and_col_p_in_view(o_view, internalcount_column, k, 0, internal_count) < 0)
return -1; return -1;
if (obi_set_int_with_elt_idx_and_col_p_in_view(o_view, headcount_column, i, 0, head_count) < 0) if (obi_set_int_with_elt_idx_and_col_p_in_view(o_view, headcount_column, k, 0, head_count) < 0)
return -1; return -1;
if (obi_set_int_with_elt_idx_and_col_p_in_view(o_view, samplecount_column, i, 0, ind_sample_count) < 0) if (obi_set_int_with_elt_idx_and_col_p_in_view(o_view, samplecount_column, k, 0, ind_sample_count) < 0)
return -1; return -1;
} }
} }