diff --git a/pkg/obiformats/fastseq_json_header.go b/pkg/obiformats/fastseq_json_header.go index ebdcab4..8006c53 100644 --- a/pkg/obiformats/fastseq_json_header.go +++ b/pkg/obiformats/fastseq_json_header.go @@ -280,7 +280,7 @@ func _parse_json_header_(header string, sequence *obiseq.BioSequence) string { if err != nil { log.Fatalf("%s: Cannot parse merged slot %s: %v", sequence.Id(), skey, err) } else { - annotations[skey] = data + annotations[skey] = obiseq.MapAsStatsOnValues(data) } } else { log.Fatalf("%s: Cannot parse merged slot %s", sequence.Id(), skey) diff --git a/pkg/obiformats/fastseq_obi_header.go b/pkg/obiformats/fastseq_obi_header.go index f8e2e3c..79a45b1 100644 --- a/pkg/obiformats/fastseq_obi_header.go +++ b/pkg/obiformats/fastseq_obi_header.go @@ -216,11 +216,14 @@ func ParseOBIFeatures(text string, annotations obiseq.Annotation) string { j = __obi_header_map_int_key__.ReplaceAll(j, []byte(`$1"$2":`)) var err error switch { - case strings.HasPrefix(key, "merged_") || - strings.HasSuffix(key, "_count"): + case strings.HasSuffix(key, "_count"): dict := make(map[string]int) err = json.Unmarshal(j, &dict) value = dict + case strings.HasPrefix(key, "merged_"): + dict := make(map[string]int) + err = json.Unmarshal(j, &dict) + value = obiseq.MapAsStatsOnValues(dict) case strings.HasSuffix(key, "_status") || strings.HasSuffix(key, "_mutation"): dict := make(map[string]string) @@ -313,10 +316,20 @@ func WriteFastSeqOBIHeade(buffer *bytes.Buffer, sequence *obiseq.BioSequence) { switch t := value.(type) { case string: buffer.WriteString(fmt.Sprintf("%s=%s; ", key, t)) + case *obiseq.StatsOnValues: + t.RLock() + tv, err := obiutils.JsonMarshal(t.Map()) + t.RUnlock() + if err != nil { + log.Fatalf("Cannot convert %v value", value) + } + tv = bytes.ReplaceAll(tv, []byte(`"`), []byte("'")) + buffer.WriteString(fmt.Sprintf("%s=", key)) + buffer.Write(tv) + buffer.WriteString("; ") case map[string]int, map[string]string, - map[string]interface{}, - obiseq.StatsOnValues: + map[string]interface{}: tv, err := obiutils.JsonMarshal(t) if err != nil { log.Fatalf("Cannot convert %v value", value) diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index aa539bb..4bb7230 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -8,7 +8,7 @@ import ( // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "efc3f3a" +var _Commit = "8a2bb1f" var _Version = "Release 4.4.0" // Version returns the version of the obitools package. diff --git a/pkg/obiseq/merge.go b/pkg/obiseq/merge.go index 15fa902..5d2bcc0 100644 --- a/pkg/obiseq/merge.go +++ b/pkg/obiseq/merge.go @@ -5,12 +5,18 @@ import ( "math" "reflect" "strings" + "sync" + + "github.com/goccy/go-json" - "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" log "github.com/sirupsen/logrus" ) -type StatsOnValues map[string]int +type StatsOnValues struct { + counts map[string]int + lock sync.RWMutex +} + type StatsOnWeights func(sequence *BioSequence) int type StatsOnDescription struct { Name string @@ -51,6 +57,108 @@ func MakeStatsOnDescription(descriptor string) StatsOnDescription { var _merge_prefix = "merged_" +func NewStatOnValues() *StatsOnValues { + v := StatsOnValues{ + counts: make(map[string]int), + lock: sync.RWMutex{}, + } + + return &v +} + +func MapAsStatsOnValues(m map[string]int) *StatsOnValues { + v := StatsOnValues{ + counts: m, + lock: sync.RWMutex{}, + } + + return &v + +} +func (sov *StatsOnValues) RLock() { + sov.lock.RLock() +} + +func (sov *StatsOnValues) RUnlock() { + sov.lock.RUnlock() +} + +func (sov *StatsOnValues) Lock() { + sov.lock.Lock() +} + +func (sov *StatsOnValues) Unlock() { + sov.lock.Unlock() +} + +func (sov *StatsOnValues) Get(key string) (int, bool) { + if sov == nil { + return -1, false + } + + sov.RLock() + defer sov.RUnlock() + v, ok := sov.counts[key] + if !ok { + v = 0 + } + return v, ok +} + +func (sov *StatsOnValues) Map() map[string]int { + return sov.counts +} + +func (sov *StatsOnValues) Set(key string, value int) { + if sov == nil { + return + } + + sov.Lock() + defer sov.Unlock() + sov.counts[key] = value +} + +func (sov *StatsOnValues) Add(key string, value int) int { + if sov == nil { + return -1 + } + + sov.Lock() + defer sov.Unlock() + + v, ok := sov.counts[key] + if !ok { + v = 0 + } + v += value + sov.counts[key] = v + + return v +} + +func (sov *StatsOnValues) Len() int { + sov.RLock() + defer sov.RUnlock() + return len(sov.counts) +} + +func (sov *StatsOnValues) Keys() []string { + v := make([]string, 0, sov.Len()) + sov.RLock() + defer sov.RUnlock() + for k := range sov.counts { + v = append(v, k) + } + return v +} + +func (sov *StatsOnValues) MarshalJSON() ([]byte, error) { + sov.RLock() + defer sov.RUnlock() + return json.Marshal(sov.Map()) +} + // StatsOnSlotName returns the name of the slot that summarizes statistics of occurrence for a given attribute. // // Parameters: @@ -89,41 +197,24 @@ func (sequence *BioSequence) HasStatsOn(key string) bool { // // Return type: // - StatsOnValues -func (sequence *BioSequence) StatsOn(desc StatsOnDescription, na string) StatsOnValues { +func (sequence *BioSequence) StatsOn(desc StatsOnDescription, na string) *StatsOnValues { + var stats *StatsOnValues + var newstat bool + mkey := StatsOnSlotName(desc.Name) istat, ok := sequence.GetAttribute(mkey) - var stats StatsOnValues - var newstat bool - - if ok { - switch istat := istat.(type) { - case StatsOnValues: - stats = istat - newstat = false - case map[string]int: - stats = istat - newstat = false - case map[string]interface{}: - stats = make(StatsOnValues, len(istat)) - newstat = false - var err error - for k, v := range istat { - stats[k], err = obiutils.InterfaceToInt(v) - if err != nil { - log.Panicf("In sequence %s : %s stat tag not only containing integer values %s", - sequence.Id(), mkey, istat) - } - } - default: - stats = make(StatsOnValues) - sequence.SetAttribute(mkey, stats) - newstat = true - } - } else { - stats = make(StatsOnValues) + if !ok { + stats = NewStatOnValues() sequence.SetAttribute(mkey, stats) newstat = true + } else { + stats, ok = istat.(*StatsOnValues) + + if !ok { + log.Panicf("In sequence %s : %s is not a StatsOnValues type %T", sequence.Id(), mkey, istat) + } + newstat = false } if newstat { @@ -174,17 +265,7 @@ func (sequence *BioSequence) StatsPlusOne(desc StatsOnDescription, toAdd *BioSeq } - dw := desc.Weight(toAdd) - sequence.annot_lock.Lock() - old, ok := stats[sval] - if !ok { - old = 0 - } - - stats[sval] = old + dw - sequence.annot_lock.Unlock() - - sequence.SetAttribute(StatsOnSlotName(desc.Name), stats) // TODO: check if this is necessary + stats.Add(sval, desc.Weight(toAdd)) return retval } @@ -192,13 +273,18 @@ func (sequence *BioSequence) StatsPlusOne(desc StatsOnDescription, toAdd *BioSeq // // It takes a parameter `toMerged` of type StatsOnValues, which represents the StatsOnValues to be merged. // It returns a value of type StatsOnValues, which represents the merged StatsOnValues. -func (stats StatsOnValues) Merge(toMerged StatsOnValues) StatsOnValues { - for k, val := range toMerged { - old, ok := stats[k] +func (stats *StatsOnValues) Merge(toMerged *StatsOnValues) *StatsOnValues { + toMerged.RLock() + defer toMerged.RUnlock() + stats.Lock() + defer stats.Unlock() + + for k, val := range toMerged.counts { + old, ok := stats.counts[k] if !ok { old = 0 } - stats[k] = old + val + stats.counts[k] = old + val } return stats diff --git a/pkg/obiseq/predicate.go b/pkg/obiseq/predicate.go index 1258f2b..72b9fd2 100644 --- a/pkg/obiseq/predicate.go +++ b/pkg/obiseq/predicate.go @@ -201,7 +201,7 @@ func OccurInAtleast(sample string, n int) SequencePredicate { desc := MakeStatsOnDescription(sample) f := func(sequence *BioSequence) bool { stats := sequence.StatsOn(desc, "NA") - return len(stats) >= n + return stats.Len() >= n } return f diff --git a/pkg/obiseq/taxonomy_lca.go b/pkg/obiseq/taxonomy_lca.go index 4e059c7..b079c67 100644 --- a/pkg/obiseq/taxonomy_lca.go +++ b/pkg/obiseq/taxonomy_lca.go @@ -11,11 +11,13 @@ import ( func (sequence *BioSequence) TaxonomicDistribution(taxonomy *obitax.Taxonomy) map[*obitax.TaxNode]int { taxids := sequence.StatsOn(MakeStatsOnDescription("taxid"), "na") - taxons := make(map[*obitax.TaxNode]int, len(taxids)) + taxons := make(map[*obitax.TaxNode]int, taxids.Len()) taxonomy = taxonomy.OrDefault(true) - for taxid, v := range taxids { + taxids.RLock() + defer taxids.RUnlock() + for taxid, v := range taxids.Map() { t, isAlias, err := taxonomy.Taxon(taxid) if err != nil { log.Fatalf( diff --git a/pkg/obitools/obiclean/obiclean.go b/pkg/obitools/obiclean/obiclean.go index 8110bab..43e8c46 100644 --- a/pkg/obitools/obiclean/obiclean.go +++ b/pkg/obitools/obiclean/obiclean.go @@ -2,7 +2,6 @@ package obiclean import ( "fmt" - "maps" "os" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault" @@ -38,7 +37,9 @@ func buildSamples(dataset obiseq.BioSequenceSlice, for _, s := range dataset { stats := s.StatsOn(obiseq.MakeStatsOnDescription(tag), NAValue) - for k, v := range stats { + stats.RLock() + defer stats.RUnlock() + for k, v := range stats.Map() { pcr, ok := samples[k] if !ok { @@ -123,9 +124,10 @@ func NotAlwaysChimera(tag string) obiseq.SequencePredicate { if !ok || len(chimera) == 0 { return true } - samples := maps.Keys(sequence.StatsOn(descriptor, "NA")) - for s := range samples { + samples := sequence.StatsOn(descriptor, "NA").Keys() + + for _, s := range samples { if _, ok := chimera[s]; !ok { return true } diff --git a/pkg/obitools/obiconsensus/obiconsensus.go b/pkg/obitools/obiconsensus/obiconsensus.go index 046c663..e8694a6 100644 --- a/pkg/obitools/obiconsensus/obiconsensus.go +++ b/pkg/obitools/obiconsensus/obiconsensus.go @@ -151,7 +151,7 @@ func SampleWeight(seqs *obiseq.BioSequenceSlice, sample, sample_key string) func log.Panicf("Sample %s not found in sequence %d", sample, i) } - if value, ok := stats[sample]; ok { + if value, ok := stats.Get(sample); ok { return float64(value) } @@ -176,7 +176,8 @@ func SeqBySamples(seqs obiseq.BioSequenceSlice, sample_key string) map[string]*o for _, s := range seqs { if s.HasStatsOn(sample_key) { stats := s.StatsOn(obiseq.MakeStatsOnDescription(sample_key), "NA") - for k := range stats { + stats.RLock() + for k := range stats.Map() { if seqset, ok := samples[k]; ok { *seqset = append(*seqset, s) samples[k] = seqset @@ -184,6 +185,7 @@ func SeqBySamples(seqs obiseq.BioSequenceSlice, sample_key string) map[string]*o samples[k] = &obiseq.BioSequenceSlice{s} } } + stats.RUnlock() } else { if k, ok := s.GetStringAttribute(sample_key); ok { if seqset, ok := samples[k]; ok { @@ -295,57 +297,80 @@ func MinionDenoise(graph *obigraph.Graph[*obiseq.BioSequence, Mutation], bar = progressbar.NewOptions(len(*graph.Vertices), pbopt...) } - for i, v := range *graph.Vertices { + wg := &sync.WaitGroup{} + seqidxchan := make(chan int) + build := func() { var err error var clean *obiseq.BioSequence - degree := graph.Degree(i) - if degree > 4 { - pack := obiseq.MakeBioSequenceSlice(degree + 1) - for k, j := range graph.Neighbors(i) { - pack[k] = (*graph.Vertices)[j] - } - pack[degree] = v - clean, err = BuildConsensus(pack, - fmt.Sprintf("%s_consensus", v.Id()), - kmer_size, CLILowCoverage(), - CLISaveGraphToFiles(), CLIGraphFilesDirectory()) - if err != nil { - log.Warning(err) - clean = (*graph.Vertices)[i].Copy() + for i := range seqidxchan { + v := (*graph.Vertices)[i] + + degree := graph.Degree(i) + if degree > 4 { + pack := obiseq.MakeBioSequenceSlice(degree + 1) + for k, j := range graph.Neighbors(i) { + pack[k] = (*graph.Vertices)[j] + } + pack[degree] = v + clean, err = BuildConsensus(pack, + fmt.Sprintf("%s_consensus", v.Id()), + kmer_size, CLILowCoverage(), + CLISaveGraphToFiles(), CLIGraphFilesDirectory()) + + if err != nil { + log.Warning(err) + clean = (*graph.Vertices)[i].Copy() + clean.SetAttribute("obiconsensus_consensus", false) + + } + + } else { + clean = obiseq.NewBioSequence(v.Id(), v.Sequence(), v.Definition()) clean.SetAttribute("obiconsensus_consensus", false) - } - } else { - clean = obiseq.NewBioSequence(v.Id(), v.Sequence(), v.Definition()) - clean.SetAttribute("obiconsensus_consensus", false) - } + clean.SetCount(int(graph.VertexWeight(i))) + clean.SetAttribute(sample_key, graph.Name) - clean.SetCount(int(graph.VertexWeight(i))) - clean.SetAttribute(sample_key, graph.Name) + if !clean.HasAttribute("obiconsensus_weight") { + clean.SetAttribute("obiconsensus_weight", int(1)) + } - if !clean.HasAttribute("obiconsensus_weight") { - clean.SetAttribute("obiconsensus_weight", int(1)) - } + annotations := v.Annotations() - annotations := v.Annotations() + staton := obiseq.StatsOnSlotName(sample_key) + for k, v := range annotations { + if !clean.HasAttribute(k) && k != staton { + clean.SetAttribute(k, v) + } + } - staton := obiseq.StatsOnSlotName(sample_key) - for k, v := range annotations { - if !clean.HasAttribute(k) && k != staton { - clean.SetAttribute(k, v) + denoised[i] = clean + + if bar != nil { + bar.Add(1) } } - denoised[i] = clean - - if bar != nil { - bar.Add(1) - } - + wg.Done() } + nworkers := obidefault.ParallelWorkers() + wg.Add(nworkers) + + for j := 0; j < nworkers; j++ { + go build() + } + + for i := range *graph.Vertices { + seqidxchan <- i + } + + close(seqidxchan) + + wg.Wait() + return denoised } diff --git a/pkg/obitools/obidemerge/demerge.go b/pkg/obitools/obidemerge/demerge.go index a0bfc17..aefc76a 100644 --- a/pkg/obitools/obidemerge/demerge.go +++ b/pkg/obitools/obidemerge/demerge.go @@ -13,10 +13,12 @@ func MakeDemergeWorker(key string) obiseq.SeqWorker { if sequence.HasStatsOn(key) { stats := sequence.StatsOn(desc, "NA") sequence.DeleteAttribute(obiseq.StatsOnSlotName(key)) - slice := obiseq.NewBioSequenceSlice(len(stats)) + slice := obiseq.NewBioSequenceSlice(stats.Len()) i := 0 - for k, v := range stats { + stats.RLock() + defer stats.RUnlock() + for k, v := range stats.Map() { (*slice)[i] = sequence.Copy() (*slice)[i].SetAttribute(key, k) (*slice)[i].SetCount(v)