diff --git a/pkg/obiiter/workers.go b/pkg/obiiter/workers.go index 51331c3..bf5bb5e 100644 --- a/pkg/obiiter/workers.go +++ b/pkg/obiiter/workers.go @@ -19,6 +19,14 @@ func AnnotatorToSeqWorker(function SeqAnnotator) SeqWorker { return f } +// That method allows for applying a SeqWorker function on every sequences. +// +// Sequences are provided by the iterator and modified sequences are pushed +// on the returned IBioSequenceBatch. +// +// Moreover the SeqWorker function, the method accepted two optional integer parameters. +// - First is allowing to indicates the number of workers running in parallele (default 4) +// - The second the size of the chanel buffer. By default set to the same value than the input buffer. func (iterator IBioSequenceBatch) MakeIWorker(worker SeqWorker, sizes ...int) IBioSequenceBatch { nworkers := 4 buffsize := iterator.BufferSize() @@ -61,6 +69,51 @@ func (iterator IBioSequenceBatch) MakeIWorker(worker SeqWorker, sizes ...int) IB return newIter } +func (iterator IBioSequenceBatch) MakeIConditionalWorker(predicate obiseq.SequencePredicate, + worker SeqWorker, sizes ...int) IBioSequenceBatch { + nworkers := 4 + buffsize := iterator.BufferSize() + + if len(sizes) > 0 { + nworkers = sizes[0] + } + + if len(sizes) > 1 { + buffsize = sizes[1] + } + + newIter := MakeIBioSequenceBatch(buffsize) + + newIter.Add(nworkers) + + go func() { + newIter.WaitAndClose() + log.Debugln("End of the batch workers") + + }() + + f := func(iterator IBioSequenceBatch) { + for iterator.Next() { + batch := iterator.Get() + for i, seq := range batch.slice { + if predicate(batch.slice[i]) { + batch.slice[i] = worker(seq) + } + } + newIter.Push(batch) + } + newIter.Done() + } + + log.Debugln("Start of the batch workers") + for i := 0; i < nworkers-1; i++ { + go f(iterator.Split()) + } + go f(iterator) + + return newIter +} + func (iterator IBioSequenceBatch) MakeISliceWorker(worker SeqSliceWorker, sizes ...int) IBioSequenceBatch { nworkers := 4 buffsize := iterator.BufferSize() diff --git a/pkg/obiseq/biosequence.go b/pkg/obiseq/biosequence.go index 64e67e8..d5bafc3 100644 --- a/pkg/obiseq/biosequence.go +++ b/pkg/obiseq/biosequence.go @@ -199,7 +199,7 @@ func (s *BioSequence) Annotations() Annotation { } // A method that returns the value of the key in the annotation map. -func (s *BioSequence) Get(key string) (interface{}, bool) { +func (s *BioSequence) GetAttribute(key string) (interface{}, bool) { var val interface{} ok := s.annotations != nil @@ -210,12 +210,17 @@ func (s *BioSequence) Get(key string) (interface{}, bool) { return val, ok } +func (s *BioSequence) SetAttribute(key string, value interface{}) { + annot := s.Annotations() + annot[key] = value +} + // A method that returns the value of the key in the annotation map. -func (s *BioSequence) GetInt(key string) (int, bool) { +func (s *BioSequence) GetIntAttribute(key string) (int, bool) { var val int var err error - v, ok := s.Get(key) + v, ok := s.GetAttribute(key) if ok { val, err = goutils.InterfaceToInt(v) @@ -226,9 +231,9 @@ func (s *BioSequence) GetInt(key string) (int, bool) { } // A method that returns the value of the key in the annotation map. -func (s *BioSequence) GetString(key string) (string, bool) { +func (s *BioSequence) GetStringAttribute(key string) (string, bool) { var val string - v, ok := s.Get(key) + v, ok := s.GetAttribute(key) if ok { val = fmt.Sprint(v) @@ -242,7 +247,7 @@ func (s *BioSequence) GetBool(key string) (bool, bool) { var val bool var err error - v, ok := s.Get(key) + v, ok := s.GetAttribute(key) if ok { val, err = goutils.InterfaceToBool(v) @@ -259,7 +264,7 @@ func (s *BioSequence) MD5() [16]byte { // Returning the number of times the sequence has been observed. func (s *BioSequence) Count() int { - count, ok := s.GetInt("count") + count, ok := s.GetIntAttribute("count") if !ok { count = 1 @@ -270,7 +275,7 @@ func (s *BioSequence) Count() int { // Returning the taxid of the sequence. func (s *BioSequence) Taxid() int { - taxid, ok := s.GetInt("taxid") + taxid, ok := s.GetIntAttribute("taxid") if !ok { taxid = 1 @@ -330,6 +335,12 @@ func (s *BioSequence) WriteByteQualities(data byte) error { return nil } +// Clearing the sequence. +func (s *BioSequence) ClearQualities() { + s.qualities = s.qualities[0:0] +} + + // A method that appends a byte slice to the sequence. func (s *BioSequence) Write(data []byte) (int, error) { s.sequence = append(s.sequence, data...) @@ -347,3 +358,8 @@ func (s *BioSequence) WriteByte(data byte) error { s.sequence = append(s.sequence, data) return nil } + +// Clearing the sequence. +func (s *BioSequence) Clear() { + s.sequence = s.sequence[0:0] +} diff --git a/pkg/obitax/path.go b/pkg/obitax/path.go index e44daee..19bec5e 100644 --- a/pkg/obitax/path.go +++ b/pkg/obitax/path.go @@ -2,6 +2,8 @@ package obitax import ( "fmt" + + log "github.com/sirupsen/logrus" ) func (taxon *TaxNode) Path() (*TaxonSlice, error) { @@ -22,6 +24,34 @@ func (taxon *TaxNode) Path() (*TaxonSlice, error) { return &path, nil } +func (taxon *TaxNode) TaxonAtRank(rank string) *TaxNode { + for taxon.rank != rank && taxon != taxon.pparent { + taxon = taxon.pparent + + if taxon == nil { + log.Panicln("Taxonomy must be reindexed") + } + } + + if taxon == taxon.pparent { + taxon = nil + } + + return taxon +} + +func (taxon *TaxNode) Species() *TaxNode { + return taxon.TaxonAtRank("species") +} + +func (taxon *TaxNode) Genus() *TaxNode { + return taxon.TaxonAtRank("genus") +} + +func (taxon *TaxNode) Family() *TaxNode { + return taxon.TaxonAtRank("family") +} + // Returns a TaxonSet listing the requested taxon and all // its ancestors in the taxonomy down to the root. func (taxonomy *Taxonomy) Path(taxid int) (*TaxonSlice, error) { diff --git a/pkg/obitax/sequence_methods.go b/pkg/obitax/sequence_methods.go new file mode 100644 index 0000000..08b8ee0 --- /dev/null +++ b/pkg/obitax/sequence_methods.go @@ -0,0 +1,37 @@ +package obitax + +import ( + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + +// Setting the taxon at a given rank for a given sequence. +// +// Two attributes are added to the sequence. One named by the rank name stores +// the taxid, a second named by the rank name suffixed with '_name' contains the +// Scientific name of the genus. +// If the taxon at the given rank doesn't exist for the taxonomy annotation +// of the sequence, nothing happens. +func (taxonomy *Taxonomy) SetTaxonAtRank(sequence *obiseq.BioSequence, rank string) *TaxNode { + taxid := sequence.Taxid() + taxon, err := taxonomy.Taxon(taxid) + taxonAtRank := taxon.TaxonAtRank(rank) + + if err == nil && taxonAtRank != nil { + sequence.SetAttribute(rank, taxonAtRank.taxid) + sequence.SetAttribute(rank+"_name", taxonAtRank.scientificname) + } + + return taxonAtRank +} + +func (taxonomy *Taxonomy) SetSpecies(sequence *obiseq.BioSequence) *TaxNode { + return taxonomy.SetTaxonAtRank(sequence, "species") +} + +func (taxonomy *Taxonomy) SetGenus(sequence *obiseq.BioSequence) *TaxNode { + return taxonomy.SetTaxonAtRank(sequence, "genus") +} + +func (taxonomy *Taxonomy) SetFamily(sequence *obiseq.BioSequence) *TaxNode { + return taxonomy.SetTaxonAtRank(sequence, "family") +} diff --git a/pkg/obitax/sequence_predicate.go b/pkg/obitax/sequence_predicate.go index 9379e49..3ca6d94 100644 --- a/pkg/obitax/sequence_predicate.go +++ b/pkg/obitax/sequence_predicate.go @@ -22,7 +22,10 @@ func (taxonomy *Taxonomy) IsAValidTaxon(withAutoCorrection ...bool) obiseq.Seque if err == nil && taxon.taxid != taxid { if autocorrection { sequence.SetTaxid(taxon.taxid) - log.Printf("Sequence %s : Taxid %d updated with %d", taxid, taxon.taxid) + log.Printf("Sequence %s : Taxid %d updated with %d", + sequence.Id(), + taxid, + taxon.taxid) } else { if _, ok := deprecatedTaxidsWarning[taxid]; !ok { deprecatedTaxidsWarning[taxid] = true diff --git a/pkg/obitax/sequence_workers.go b/pkg/obitax/sequence_workers.go new file mode 100644 index 0000000..bfa591b --- /dev/null +++ b/pkg/obitax/sequence_workers.go @@ -0,0 +1,56 @@ +package obitax + +import ( + "git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + log "github.com/sirupsen/logrus" +) + +func (taxonomy *Taxonomy) MakeSetTaxonAtRankWorker(rank string) obiiter.SeqWorker { + + if !goutils.Contains(taxonomy.RankList(), rank) { + log.Fatalf("%s is not a valid rank (allowed ranks are %v)", + rank, + taxonomy.RankList()) + } + + w := func(sequence *obiseq.BioSequence) *obiseq.BioSequence { + taxonomy.SetTaxonAtRank(sequence, rank) + return sequence + } + + return w +} + +func (taxonomy *Taxonomy) MakeSetSpeciesWorker() obiiter.SeqWorker { + + w := func(sequence *obiseq.BioSequence) *obiseq.BioSequence { + taxonomy.SetSpecies(sequence) + return sequence + } + + return w +} + +func (taxonomy *Taxonomy) MakeSetGenusWorker() obiiter.SeqWorker { + + w := func(sequence *obiseq.BioSequence) *obiseq.BioSequence { + taxonomy.SetGenus(sequence) + return sequence + } + + return w +} + +func (taxonomy *Taxonomy) MakeSetFamilyWorker() obiiter.SeqWorker { + + w := func(sequence *obiseq.BioSequence) *obiseq.BioSequence { + taxonomy.SetFamily(sequence) + return sequence + } + + return w +} + + diff --git a/pkg/obitools/obiclean/graph.go b/pkg/obitools/obiclean/graph.go index cce3218..6d67d60 100644 --- a/pkg/obitools/obiclean/graph.go +++ b/pkg/obitools/obiclean/graph.go @@ -17,12 +17,35 @@ import ( ) type Ratio struct { + Sample string From int To int + CFrom int + CTo int Pos int Length int } +type Edge struct { + Father int + From byte + To byte + Pos int + NucPair int + Dist int +} + +func makeEdge(father, dist, pos int, from, to byte) Edge { + return Edge{ + Father: father, + Dist: dist, + Pos: pos, + From: from, + To: to, + NucPair: nucPair(from, to), + } +} + func abs(x int) int { if x < 0 { return -x @@ -74,11 +97,19 @@ func EmpiricalDistCsv(filename string, data [][]Ratio) { bar := progressbar.NewOptions(len(data), pbopt...) - fmt.Fprintln(file, "From,To,Count_from,Count_to,Position,length") + fmt.Fprintln(file, "Sample,From,To,Weight_from,Weight_to,Count_from,Count_to,Position,length") for code, dist := range data { a1, a2 := intToNucPair(code) for _, ratio := range dist { - fmt.Fprintf(file, "%c,%c,%d,%d,%d,%d\n", a1, a2, ratio.From, ratio.To, ratio.Pos, ratio.Length) + fmt.Fprintf(file, "%s,%c,%c,%d,%d,%d,%d,%d,%d\n", + ratio.Sample, + a1, a2, + ratio.From, + ratio.To, + ratio.CFrom, + ratio.CTo, + ratio.Pos, + ratio.Length) } bar.Add(1) } @@ -94,11 +125,11 @@ func Gml(seqs *[]*seqPCR, sample string, statThreshold int) string { comment "Obiclean graph for sample {{ Name }}" directed 1 {{range $index, $data:= .}} - {{ if or $data.Fathers (gt $data.SonCount 0)}} + {{ if or $data.Edges (gt $data.SonCount 0)}} node [ id {{$index}} graphics [ type "{{ Shape $data.Count }}" - fill "{{ if and (gt $data.SonCount 0) (not $data.Fathers)}}#0000FF{{ else }}#00FF00{{ end }}" + fill "{{ if and (gt $data.SonCount 0) (not $data.Edges)}}#0000FF{{ else }}#00FF00{{ end }}" h {{ Sqrt $data.Count }} w {{ Sqrt $data.Count }} ] @@ -108,11 +139,11 @@ func Gml(seqs *[]*seqPCR, sample string, statThreshold int) string { {{ end }} {{range $index, $data:= .}} - {{range $i, $father:= $data.Fathers}} + {{range $i, $edge:= $data.Edges}} edge [ source {{$index}} - target {{$father}} - color "{{ if gt (index $data.Dist $i) 1 }}#FF0000{{ else }}#00FF00{{ end }}" - label "{{(index $data.Dist $i)}}" + target {{$edge.Father}} + color "{{ if gt (index $data.Edges $i).Dist 1 }}#FF0000{{ else }}#00FF00{{ end }}" + label "{{(index $data.Edges $i).Dist}}" ] {{ end }} {{ end }} @@ -226,51 +257,83 @@ func intToNucPair(code int) (a, b byte) { return decode[c1], decode[c2] } -func buildSamplePairs(seqs *[]*seqPCR, minStatRatio int, workers int) ([][]Ratio, int) { +func reweightSequences(seqs *[]*seqPCR) { + + for _, node := range *seqs { + node.Weight = node.Count + } + + //var rfunc func(*seqPCR) + + rfunc := func(node *seqPCR) { + node.AddedSons=0 + nedges := len(node.Edges) + if nedges > 0 { + swf := 0.0 + + for k := 0; k < nedges; k++ { + swf += float64((*seqs)[node.Edges[k].Father].Count) + } + + for k := 0; k < nedges; k++ { + father := (*seqs)[node.Edges[k].Father] + father.Weight += int(math.Round(float64(node.Weight) * float64(father.Count) / swf)) + father.AddedSons++ + // log.Println(father.AddedSons, father.SonCount) + } + + } + } + + for _, node := range *seqs { + if node.SonCount == 0 { + rfunc(node) + } + } + + for done := true; done; { + done = false + for _, node := range *seqs { + if node.SonCount > 0 && node.SonCount == node.AddedSons { + // log.Println(node.AddedSons, node.SonCount) + rfunc(node) + done = true + } + } + } +} + +func buildSamplePairs(seqs *[]*seqPCR, workers int) int { nseq := len(*seqs) running := sync.WaitGroup{} - linePairs := func(i int) [][]Ratio { + linePairs := func(i int) { - ratio := make([][]Ratio, 25) son := (*seqs)[i] for j := i + 1; j < nseq; j++ { father := (*seqs)[j] d, pos, a1, a2 := obialign.D1Or0(son.Sequence, father.Sequence) if d > 0 { - son.Fathers = append(son.Fathers, j) - son.Dist = append(son.Dist, d) + son.Edges = append(son.Edges, makeEdge(j, d, pos, a1, a2)) father.SonCount++ - if father.Count > minStatRatio { - n := nucPair(a1, a2) - ratio[n] = append(ratio[n], Ratio{father.Count, son.Count, pos, father.Sequence.Length()}) - } } } - return ratio } lineChan := make(chan int) - idxChan := make(chan [][]Ratio) ff := func() { for i := range lineChan { - idxChan <- linePairs(i) + linePairs(i) } - running.Done() } running.Add(workers) - go func() { - running.Wait() - close(idxChan) - }() - for i := 0; i < workers; i++ { go ff() } @@ -283,15 +346,12 @@ func buildSamplePairs(seqs *[]*seqPCR, minStatRatio int, workers int) ([][]Ratio }() np := nseq * (nseq - 1) / 2 - ratio := make([][]Ratio, 25) - for data := range idxChan { - for i, r := range data { - ratio[i] = append(ratio[i], r...) - } - } + running.Wait() - return ratio, np + reweightSequences(seqs) + + return np } func extendSimilarityGraph(seqs *[]*seqPCR, step int, workers int) int { @@ -310,8 +370,7 @@ func extendSimilarityGraph(seqs *[]*seqPCR, step int, workers int) int { matrix) d := (lali - lcs) if lcs >= 0 && d <= step && step > 0 { - son.Fathers = append(son.Fathers, j) - son.Dist = append(son.Dist, d) + son.Edges = append(son.Edges, makeEdge(j, d, -1, '-', '-')) father.SonCount++ //a, b := minMax((*seqs)[i].Count, (*seqs)[j].Count) } @@ -340,7 +399,7 @@ func extendSimilarityGraph(seqs *[]*seqPCR, step int, workers int) int { go func() { for i := 0; i < nseq; i++ { - if len((*seqs)[i].Fathers) == 0 { + if len((*seqs)[i].Edges) == 0 { lineChan <- i } } @@ -354,21 +413,18 @@ func extendSimilarityGraph(seqs *[]*seqPCR, step int, workers int) int { func FilterGraphOnRatio(seqs *[]*seqPCR, ratio float64) { for _, s1 := range *seqs { - c1 := float64(s1.Count) - f := s1.Fathers - d := s1.Dist + c1 := float64(s1.Weight) + e := s1.Edges j := 0 - for i, s2 := range f { - f[j] = f[i] - d[j] = d[i] - if (c1 / float64((*seqs)[s2].Count)) <= math.Pow(ratio, float64(d[i])) { + for i, s2 := range e { + e[j] = e[i] + if (c1 / float64((*seqs)[s2.Father].Weight)) <= math.Pow(ratio, float64(e[i].Dist)) { j++ } else { - (*seqs)[s2].SonCount-- + (*seqs)[s2.Father].SonCount-- } } - s1.Fathers = f[0:j] - s1.Dist = d[0:j] + s1.Edges = e[0:j] } } @@ -384,7 +440,7 @@ func sortSamples(samples map[string]*([]*seqPCR)) { } func ObicleanStatus(seq *seqPCR) string { - if len(seq.Fathers) == 0 { + if len(seq.Edges) == 0 { if seq.SonCount > 0 { return "h" } else { @@ -395,8 +451,28 @@ func ObicleanStatus(seq *seqPCR) string { } } +func EstimateRatio(samples map[string]*[]*seqPCR, minStatRatio int) [][]Ratio { + ratio := make([][]Ratio, 25) + + for name, seqs := range samples { + + for _, seq := range *seqs { + for _, edge := range seq.Edges { + father := (*seqs)[edge.Father] + if father.Weight >= minStatRatio && edge.Dist == 1 { + ratio[edge.NucPair] = append(ratio[edge.NucPair], Ratio{name, father.Weight, seq.Weight, father.Count, seq.Count, edge.Pos, father.Sequence.Length()}) + } + } + + } + } + + return ratio + +} + func BuildSeqGraph(samples map[string]*[]*seqPCR, - maxError, minStatRatio, workers int) [][]Ratio { + maxError, workers int) { sortSamples(samples) @@ -416,13 +492,8 @@ func BuildSeqGraph(samples map[string]*[]*seqPCR, ) bar := progressbar.NewOptions(npairs, pbopt...) - all_ratio := make([][]Ratio, 25) for _, seqs := range samples { - ratio, np := buildSamplePairs(seqs, minStatRatio, workers) - - for i, r := range ratio { - all_ratio[i] = append(all_ratio[i], r...) - } + np := buildSamplePairs(seqs, workers) bar.Add(np) } @@ -444,6 +515,4 @@ func BuildSeqGraph(samples map[string]*[]*seqPCR, bar.Add(np) } } - - return all_ratio } diff --git a/pkg/obitools/obiclean/obiclean.go b/pkg/obitools/obiclean/obiclean.go index b25a075..50afa8d 100644 --- a/pkg/obitools/obiclean/obiclean.go +++ b/pkg/obitools/obiclean/obiclean.go @@ -13,11 +13,12 @@ import ( ) type seqPCR struct { - Count int // number of reads associated to a sequence in a PCR - Sequence *obiseq.BioSequence // pointer to the corresponding sequence - SonCount int - Fathers []int - Dist []int + Count int // number of reads associated to a sequence in a PCR + Weight int // Number of reads associated to a sequence after clustering + Sequence *obiseq.BioSequence // pointer to the corresponding sequence + SonCount int + AddedSons int + Edges []Edge } // buildSamples sorts the sequences by samples @@ -43,9 +44,10 @@ func buildSamples(dataset obiseq.BioSequenceSlice, } *pcr = append(*pcr, &seqPCR{ - Count: v, - Sequence: s, - SonCount: 0, + Count: v, + Sequence: s, + SonCount: 0, + AddedSons: 0, }) } } @@ -181,6 +183,33 @@ func Status(sequence *obiseq.BioSequence) map[string]string { return obistatus } +func Weight(sequence *obiseq.BioSequence) map[string]int { + annotation := sequence.Annotations() + iobistatus, ok := annotation["obiclean_weight"] + var weight map[string]int + var err error + + if ok { + switch iobistatus := iobistatus.(type) { + case map[string]int: + weight = iobistatus + case map[string]interface{}: + weight = make(map[string]int) + for k, v := range iobistatus { + weight[k], err = goutils.InterfaceToInt(v) + if err != nil { + log.Panicf("Weight value %v cannnot be casted to an integer value\n", v) + } + } + } + } else { + weight = make(map[string]int) + annotation["obiclean_weight"] = weight + } + + return weight +} + func IOBIClean(itertator obiiter.IBioSequenceBatch) obiiter.IBioSequenceBatch { db := itertator.Load() @@ -191,9 +220,8 @@ func IOBIClean(itertator obiiter.IBioSequenceBatch) obiiter.IBioSequenceBatch { log.Infof("Dataset composed of %d samples\n", len(samples)) - all_ratio := BuildSeqGraph(samples, + BuildSeqGraph(samples, DistStepMax(), - MinCountToEvalMutationRate(), obioptions.CLIParallelWorkers()) if RatioMax() < 1.0 { @@ -215,6 +243,7 @@ func IOBIClean(itertator obiiter.IBioSequenceBatch) obiiter.IBioSequenceBatch { } if IsSaveRatioTable() { + all_ratio := EstimateRatio(samples, MinCountToEvalMutationRate()) EmpiricalDistCsv(RatioTableFilename(), all_ratio) } @@ -237,6 +266,9 @@ func IOBIClean(itertator obiiter.IBioSequenceBatch) obiiter.IBioSequenceBatch { for _, pcr := range *seqs { obistatus := Status(pcr.Sequence) obistatus[name] = ObicleanStatus(pcr) + + obiweight := Weight(pcr.Sequence) + obiweight[name] = pcr.Weight } bar.Add(1) }