Manage a lock on StatsOnValues

This commit is contained in:
Eric Coissac
2025-06-17 16:46:11 +02:00
parent 8a2bb1fe82
commit 9965370d85
9 changed files with 230 additions and 100 deletions

View File

@ -280,7 +280,7 @@ func _parse_json_header_(header string, sequence *obiseq.BioSequence) string {
if err != nil {
log.Fatalf("%s: Cannot parse merged slot %s: %v", sequence.Id(), skey, err)
} else {
annotations[skey] = data
annotations[skey] = obiseq.MapAsStatsOnValues(data)
}
} else {
log.Fatalf("%s: Cannot parse merged slot %s", sequence.Id(), skey)

View File

@ -216,11 +216,14 @@ func ParseOBIFeatures(text string, annotations obiseq.Annotation) string {
j = __obi_header_map_int_key__.ReplaceAll(j, []byte(`$1"$2":`))
var err error
switch {
case strings.HasPrefix(key, "merged_") ||
strings.HasSuffix(key, "_count"):
case strings.HasSuffix(key, "_count"):
dict := make(map[string]int)
err = json.Unmarshal(j, &dict)
value = dict
case strings.HasPrefix(key, "merged_"):
dict := make(map[string]int)
err = json.Unmarshal(j, &dict)
value = obiseq.MapAsStatsOnValues(dict)
case strings.HasSuffix(key, "_status") ||
strings.HasSuffix(key, "_mutation"):
dict := make(map[string]string)
@ -313,10 +316,20 @@ func WriteFastSeqOBIHeade(buffer *bytes.Buffer, sequence *obiseq.BioSequence) {
switch t := value.(type) {
case string:
buffer.WriteString(fmt.Sprintf("%s=%s; ", key, t))
case *obiseq.StatsOnValues:
t.RLock()
tv, err := obiutils.JsonMarshal(t.Map())
t.RUnlock()
if err != nil {
log.Fatalf("Cannot convert %v value", value)
}
tv = bytes.ReplaceAll(tv, []byte(`"`), []byte("'"))
buffer.WriteString(fmt.Sprintf("%s=", key))
buffer.Write(tv)
buffer.WriteString("; ")
case map[string]int,
map[string]string,
map[string]interface{},
obiseq.StatsOnValues:
map[string]interface{}:
tv, err := obiutils.JsonMarshal(t)
if err != nil {
log.Fatalf("Cannot convert %v value", value)

View File

@ -8,7 +8,7 @@ import (
// corresponds to the last commit, and not the one when the file will be
// commited
var _Commit = "efc3f3a"
var _Commit = "8a2bb1f"
var _Version = "Release 4.4.0"
// Version returns the version of the obitools package.

View File

@ -5,12 +5,18 @@ import (
"math"
"reflect"
"strings"
"sync"
"github.com/goccy/go-json"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
log "github.com/sirupsen/logrus"
)
type StatsOnValues map[string]int
type StatsOnValues struct {
counts map[string]int
lock sync.RWMutex
}
type StatsOnWeights func(sequence *BioSequence) int
type StatsOnDescription struct {
Name string
@ -51,6 +57,108 @@ func MakeStatsOnDescription(descriptor string) StatsOnDescription {
var _merge_prefix = "merged_"
func NewStatOnValues() *StatsOnValues {
v := StatsOnValues{
counts: make(map[string]int),
lock: sync.RWMutex{},
}
return &v
}
func MapAsStatsOnValues(m map[string]int) *StatsOnValues {
v := StatsOnValues{
counts: m,
lock: sync.RWMutex{},
}
return &v
}
func (sov *StatsOnValues) RLock() {
sov.lock.RLock()
}
func (sov *StatsOnValues) RUnlock() {
sov.lock.RUnlock()
}
func (sov *StatsOnValues) Lock() {
sov.lock.Lock()
}
func (sov *StatsOnValues) Unlock() {
sov.lock.Unlock()
}
func (sov *StatsOnValues) Get(key string) (int, bool) {
if sov == nil {
return -1, false
}
sov.RLock()
defer sov.RUnlock()
v, ok := sov.counts[key]
if !ok {
v = 0
}
return v, ok
}
func (sov *StatsOnValues) Map() map[string]int {
return sov.counts
}
func (sov *StatsOnValues) Set(key string, value int) {
if sov == nil {
return
}
sov.Lock()
defer sov.Unlock()
sov.counts[key] = value
}
func (sov *StatsOnValues) Add(key string, value int) int {
if sov == nil {
return -1
}
sov.Lock()
defer sov.Unlock()
v, ok := sov.counts[key]
if !ok {
v = 0
}
v += value
sov.counts[key] = v
return v
}
func (sov *StatsOnValues) Len() int {
sov.RLock()
defer sov.RUnlock()
return len(sov.counts)
}
func (sov *StatsOnValues) Keys() []string {
v := make([]string, 0, sov.Len())
sov.RLock()
defer sov.RUnlock()
for k := range sov.counts {
v = append(v, k)
}
return v
}
func (sov *StatsOnValues) MarshalJSON() ([]byte, error) {
sov.RLock()
defer sov.RUnlock()
return json.Marshal(sov.Map())
}
// StatsOnSlotName returns the name of the slot that summarizes statistics of occurrence for a given attribute.
//
// Parameters:
@ -89,41 +197,24 @@ func (sequence *BioSequence) HasStatsOn(key string) bool {
//
// Return type:
// - StatsOnValues
func (sequence *BioSequence) StatsOn(desc StatsOnDescription, na string) StatsOnValues {
func (sequence *BioSequence) StatsOn(desc StatsOnDescription, na string) *StatsOnValues {
var stats *StatsOnValues
var newstat bool
mkey := StatsOnSlotName(desc.Name)
istat, ok := sequence.GetAttribute(mkey)
var stats StatsOnValues
var newstat bool
if ok {
switch istat := istat.(type) {
case StatsOnValues:
stats = istat
newstat = false
case map[string]int:
stats = istat
newstat = false
case map[string]interface{}:
stats = make(StatsOnValues, len(istat))
newstat = false
var err error
for k, v := range istat {
stats[k], err = obiutils.InterfaceToInt(v)
if err != nil {
log.Panicf("In sequence %s : %s stat tag not only containing integer values %s",
sequence.Id(), mkey, istat)
}
}
default:
stats = make(StatsOnValues)
sequence.SetAttribute(mkey, stats)
newstat = true
}
} else {
stats = make(StatsOnValues)
if !ok {
stats = NewStatOnValues()
sequence.SetAttribute(mkey, stats)
newstat = true
} else {
stats, ok = istat.(*StatsOnValues)
if !ok {
log.Panicf("In sequence %s : %s is not a StatsOnValues type %T", sequence.Id(), mkey, istat)
}
newstat = false
}
if newstat {
@ -174,17 +265,7 @@ func (sequence *BioSequence) StatsPlusOne(desc StatsOnDescription, toAdd *BioSeq
}
dw := desc.Weight(toAdd)
sequence.annot_lock.Lock()
old, ok := stats[sval]
if !ok {
old = 0
}
stats[sval] = old + dw
sequence.annot_lock.Unlock()
sequence.SetAttribute(StatsOnSlotName(desc.Name), stats) // TODO: check if this is necessary
stats.Add(sval, desc.Weight(toAdd))
return retval
}
@ -192,13 +273,18 @@ func (sequence *BioSequence) StatsPlusOne(desc StatsOnDescription, toAdd *BioSeq
//
// It takes a parameter `toMerged` of type StatsOnValues, which represents the StatsOnValues to be merged.
// It returns a value of type StatsOnValues, which represents the merged StatsOnValues.
func (stats StatsOnValues) Merge(toMerged StatsOnValues) StatsOnValues {
for k, val := range toMerged {
old, ok := stats[k]
func (stats *StatsOnValues) Merge(toMerged *StatsOnValues) *StatsOnValues {
toMerged.RLock()
defer toMerged.RUnlock()
stats.Lock()
defer stats.Unlock()
for k, val := range toMerged.counts {
old, ok := stats.counts[k]
if !ok {
old = 0
}
stats[k] = old + val
stats.counts[k] = old + val
}
return stats

View File

@ -201,7 +201,7 @@ func OccurInAtleast(sample string, n int) SequencePredicate {
desc := MakeStatsOnDescription(sample)
f := func(sequence *BioSequence) bool {
stats := sequence.StatsOn(desc, "NA")
return len(stats) >= n
return stats.Len() >= n
}
return f

View File

@ -11,11 +11,13 @@ import (
func (sequence *BioSequence) TaxonomicDistribution(taxonomy *obitax.Taxonomy) map[*obitax.TaxNode]int {
taxids := sequence.StatsOn(MakeStatsOnDescription("taxid"), "na")
taxons := make(map[*obitax.TaxNode]int, len(taxids))
taxons := make(map[*obitax.TaxNode]int, taxids.Len())
taxonomy = taxonomy.OrDefault(true)
for taxid, v := range taxids {
taxids.RLock()
defer taxids.RUnlock()
for taxid, v := range taxids.Map() {
t, isAlias, err := taxonomy.Taxon(taxid)
if err != nil {
log.Fatalf(

View File

@ -2,7 +2,6 @@ package obiclean
import (
"fmt"
"maps"
"os"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
@ -38,7 +37,9 @@ func buildSamples(dataset obiseq.BioSequenceSlice,
for _, s := range dataset {
stats := s.StatsOn(obiseq.MakeStatsOnDescription(tag), NAValue)
for k, v := range stats {
stats.RLock()
defer stats.RUnlock()
for k, v := range stats.Map() {
pcr, ok := samples[k]
if !ok {
@ -123,9 +124,10 @@ func NotAlwaysChimera(tag string) obiseq.SequencePredicate {
if !ok || len(chimera) == 0 {
return true
}
samples := maps.Keys(sequence.StatsOn(descriptor, "NA"))
for s := range samples {
samples := sequence.StatsOn(descriptor, "NA").Keys()
for _, s := range samples {
if _, ok := chimera[s]; !ok {
return true
}

View File

@ -151,7 +151,7 @@ func SampleWeight(seqs *obiseq.BioSequenceSlice, sample, sample_key string) func
log.Panicf("Sample %s not found in sequence %d", sample, i)
}
if value, ok := stats[sample]; ok {
if value, ok := stats.Get(sample); ok {
return float64(value)
}
@ -176,7 +176,8 @@ func SeqBySamples(seqs obiseq.BioSequenceSlice, sample_key string) map[string]*o
for _, s := range seqs {
if s.HasStatsOn(sample_key) {
stats := s.StatsOn(obiseq.MakeStatsOnDescription(sample_key), "NA")
for k := range stats {
stats.RLock()
for k := range stats.Map() {
if seqset, ok := samples[k]; ok {
*seqset = append(*seqset, s)
samples[k] = seqset
@ -184,6 +185,7 @@ func SeqBySamples(seqs obiseq.BioSequenceSlice, sample_key string) map[string]*o
samples[k] = &obiseq.BioSequenceSlice{s}
}
}
stats.RUnlock()
} else {
if k, ok := s.GetStringAttribute(sample_key); ok {
if seqset, ok := samples[k]; ok {
@ -295,57 +297,80 @@ func MinionDenoise(graph *obigraph.Graph[*obiseq.BioSequence, Mutation],
bar = progressbar.NewOptions(len(*graph.Vertices), pbopt...)
}
for i, v := range *graph.Vertices {
wg := &sync.WaitGroup{}
seqidxchan := make(chan int)
build := func() {
var err error
var clean *obiseq.BioSequence
degree := graph.Degree(i)
if degree > 4 {
pack := obiseq.MakeBioSequenceSlice(degree + 1)
for k, j := range graph.Neighbors(i) {
pack[k] = (*graph.Vertices)[j]
}
pack[degree] = v
clean, err = BuildConsensus(pack,
fmt.Sprintf("%s_consensus", v.Id()),
kmer_size, CLILowCoverage(),
CLISaveGraphToFiles(), CLIGraphFilesDirectory())
if err != nil {
log.Warning(err)
clean = (*graph.Vertices)[i].Copy()
for i := range seqidxchan {
v := (*graph.Vertices)[i]
degree := graph.Degree(i)
if degree > 4 {
pack := obiseq.MakeBioSequenceSlice(degree + 1)
for k, j := range graph.Neighbors(i) {
pack[k] = (*graph.Vertices)[j]
}
pack[degree] = v
clean, err = BuildConsensus(pack,
fmt.Sprintf("%s_consensus", v.Id()),
kmer_size, CLILowCoverage(),
CLISaveGraphToFiles(), CLIGraphFilesDirectory())
if err != nil {
log.Warning(err)
clean = (*graph.Vertices)[i].Copy()
clean.SetAttribute("obiconsensus_consensus", false)
}
} else {
clean = obiseq.NewBioSequence(v.Id(), v.Sequence(), v.Definition())
clean.SetAttribute("obiconsensus_consensus", false)
}
} else {
clean = obiseq.NewBioSequence(v.Id(), v.Sequence(), v.Definition())
clean.SetAttribute("obiconsensus_consensus", false)
}
clean.SetCount(int(graph.VertexWeight(i)))
clean.SetAttribute(sample_key, graph.Name)
clean.SetCount(int(graph.VertexWeight(i)))
clean.SetAttribute(sample_key, graph.Name)
if !clean.HasAttribute("obiconsensus_weight") {
clean.SetAttribute("obiconsensus_weight", int(1))
}
if !clean.HasAttribute("obiconsensus_weight") {
clean.SetAttribute("obiconsensus_weight", int(1))
}
annotations := v.Annotations()
annotations := v.Annotations()
staton := obiseq.StatsOnSlotName(sample_key)
for k, v := range annotations {
if !clean.HasAttribute(k) && k != staton {
clean.SetAttribute(k, v)
}
}
staton := obiseq.StatsOnSlotName(sample_key)
for k, v := range annotations {
if !clean.HasAttribute(k) && k != staton {
clean.SetAttribute(k, v)
denoised[i] = clean
if bar != nil {
bar.Add(1)
}
}
denoised[i] = clean
if bar != nil {
bar.Add(1)
}
wg.Done()
}
nworkers := obidefault.ParallelWorkers()
wg.Add(nworkers)
for j := 0; j < nworkers; j++ {
go build()
}
for i := range *graph.Vertices {
seqidxchan <- i
}
close(seqidxchan)
wg.Wait()
return denoised
}

View File

@ -13,10 +13,12 @@ func MakeDemergeWorker(key string) obiseq.SeqWorker {
if sequence.HasStatsOn(key) {
stats := sequence.StatsOn(desc, "NA")
sequence.DeleteAttribute(obiseq.StatsOnSlotName(key))
slice := obiseq.NewBioSequenceSlice(len(stats))
slice := obiseq.NewBioSequenceSlice(stats.Len())
i := 0
for k, v := range stats {
stats.RLock()
defer stats.RUnlock()
for k, v := range stats.Map() {
(*slice)[i] = sequence.Copy()
(*slice)[i].SetAttribute(key, k)
(*slice)[i].SetCount(v)