From 2e0c1bd801669f15f429053b83cd7977fdf0ffc6 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 22 Nov 2023 09:46:30 +0100 Subject: [PATCH] Correct the number of workers Former-commit-id: febbccfb853263e0761ecfccb0f09c8c1bf88475 --- cmd/obitools/obisummary/main.go | 2 +- pkg/obichunk/subchunks.go | 5 +- pkg/obiiter/batchiterator.go | 5 +- pkg/obiiter/workers.go | 8 ++- pkg/obiseq/biosequence.go | 3 + pkg/obiseq/subseq.go | 17 ++++-- pkg/obiseq/worker.go | 57 +++++++++++++------ pkg/obitools/obiannotate/obiannotate.go | 73 ++++++++++++++++++++++++- pkg/obitools/obiannotate/options.go | 53 ++++++++++++++++-- pkg/obitools/obisummary/obisummary.go | 10 +++- pkg/obitools/obisummary/options.go | 12 ++++ 11 files changed, 206 insertions(+), 39 deletions(-) diff --git a/cmd/obitools/obisummary/main.go b/cmd/obitools/obisummary/main.go index ad13d78..cfd1c63 100644 --- a/cmd/obitools/obisummary/main.go +++ b/cmd/obitools/obisummary/main.go @@ -45,7 +45,7 @@ func main() { os.Exit(1) } - summary := obisummary.ISummary(fs) + summary := obisummary.ISummary(fs, obisummary.CLIMapSummary()) if obisummary.CLIOutFormat() == "json" { output, _ := json.MarshalIndent(summary, "", " ") diff --git a/pkg/obichunk/subchunks.go b/pkg/obichunk/subchunks.go index 240d2f6..79ca4fc 100644 --- a/pkg/obichunk/subchunks.go +++ b/pkg/obichunk/subchunks.go @@ -7,6 +7,7 @@ import ( log "github.com/sirupsen/logrus" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -60,8 +61,8 @@ func ISequenceSubChunk(iterator obiiter.IBioSequence, classifier *obiseq.BioSequenceClassifier, nworkers int) (obiiter.IBioSequence, error) { - if nworkers <=0 { - nworkers = 4 + if nworkers <= 0 { + nworkers = obioptions.CLIParallelWorkers() } newIter := obiiter.MakeIBioSequence() diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index f6901ec..e9bce70 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -10,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" "github.com/tevino/abool/v2" @@ -566,7 +567,7 @@ func (iterator IBioSequence) DivideOn(predicate obiseq.SequencePredicate, // A function that takes a predicate and a batch of sequences and returns a filtered batch of sequences. func (iterator IBioSequence) FilterOn(predicate obiseq.SequencePredicate, size int, sizes ...int) IBioSequence { - nworkers := 4 + nworkers := obioptions.CLIReadParallelWorkers() if len(sizes) > 0 { nworkers = sizes[0] @@ -618,7 +619,7 @@ func (iterator IBioSequence) FilterOn(predicate obiseq.SequencePredicate, func (iterator IBioSequence) FilterAnd(predicate obiseq.SequencePredicate, size int, sizes ...int) IBioSequence { - nworkers := 4 + nworkers := obioptions.CLIReadParallelWorkers() if len(sizes) > 0 { nworkers = sizes[0] diff --git a/pkg/obiiter/workers.go b/pkg/obiiter/workers.go index 469895b..1bd90a4 100644 --- a/pkg/obiiter/workers.go +++ b/pkg/obiiter/workers.go @@ -58,7 +58,7 @@ func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int) func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePredicate, worker obiseq.SeqWorker, sizes ...int) IBioSequence { - nworkers := 4 + nworkers := obioptions.CLIReadParallelWorkers() if len(sizes) > 0 { nworkers = sizes[0] @@ -101,7 +101,7 @@ func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePre } func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, sizes ...int) IBioSequence { - nworkers := 4 + nworkers := obioptions.CLIParallelWorkers() if len(sizes) > 0 { nworkers = sizes[0] @@ -119,7 +119,11 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, size f := func(iterator IBioSequence) { for iterator.Next() { batch := iterator.Get() + bs := len(batch.slice) batch.slice = worker(batch.slice) + if bs != len(batch.slice) { + log.Warnf("Input size : %d output %d", bs, len(batch.slice)) + } newIter.Push(batch) } newIter.Done() diff --git a/pkg/obiseq/biosequence.go b/pkg/obiseq/biosequence.go index 871036e..973bbce 100644 --- a/pkg/obiseq/biosequence.go +++ b/pkg/obiseq/biosequence.go @@ -239,6 +239,9 @@ func (s *BioSequence) String() string { // It does not take any parameters. // It returns an integer representing the length of the sequence. func (s *BioSequence) Len() int { + if s == nil { + return 0 + } return len(s.sequence) } diff --git a/pkg/obiseq/subseq.go b/pkg/obiseq/subseq.go index 5039e21..c288c2e 100644 --- a/pkg/obiseq/subseq.go +++ b/pkg/obiseq/subseq.go @@ -1,7 +1,6 @@ package obiseq import ( - "errors" "fmt" ) @@ -17,15 +16,21 @@ import ( // - error: an error if the subsequence parameters are invalid. func (sequence *BioSequence) Subsequence(from, to int, circular bool) (*BioSequence, error) { if from >= to && !circular { - return nil, errors.New("from greater than to") + return nil, fmt.Errorf("from: %d greater than to: %d", from, to) } - if from < 0 || from >= sequence.Len() { - return nil, errors.New("from out of bounds") + if from < 0 { + return nil, fmt.Errorf("from out of bounds %d < 0", from) } - if to <= 0 || to > sequence.Len() { - return nil, errors.New("to out of bounds") + if from >= sequence.Len() { + return nil, + fmt.Errorf("from out of bounds %d >= %d", from, sequence.Len()) + } + + if to > sequence.Len() { + return nil, + fmt.Errorf("to out of bounds %d > %d", to, sequence.Len()) } var newSeq *BioSequence diff --git a/pkg/obiseq/worker.go b/pkg/obiseq/worker.go index 6a1f5e7..a848706 100644 --- a/pkg/obiseq/worker.go +++ b/pkg/obiseq/worker.go @@ -1,5 +1,7 @@ package obiseq +import log "github.com/sirupsen/logrus" + type SeqAnnotator func(*BioSequence) type SeqWorker func(*BioSequence) *BioSequence @@ -17,7 +19,8 @@ func AnnotatorToSeqWorker(function SeqAnnotator) SeqWorker { return f } -func SeqToSliceWorker(worker SeqWorker, inplace bool) SeqSliceWorker { +func SeqToSliceWorker(worker SeqWorker, + inplace, breakOnError bool) SeqSliceWorker { var f SeqSliceWorker if worker == nil { @@ -25,12 +28,12 @@ func SeqToSliceWorker(worker SeqWorker, inplace bool) SeqSliceWorker { f = func(input BioSequenceSlice) BioSequenceSlice { return input } - } else { + } else { f = func(input BioSequenceSlice) BioSequenceSlice { output := MakeBioSequenceSlice(len(input)) - copy(output,input) + copy(output, input) return output - } + } } } else { f = func(input BioSequenceSlice) BioSequenceSlice { @@ -38,12 +41,21 @@ func SeqToSliceWorker(worker SeqWorker, inplace bool) SeqSliceWorker { if !inplace { output = MakeBioSequenceSlice(len(input)) } - for i, s := range input { - output[i] = worker(s) + i := 0 + for _, s := range input { + r := worker(s) + if r != nil { + output[i] = r + i++ + } else if breakOnError { + log.Fatalf("got an error on sequence %s processing", + r.Id()) + } } - - return output - } + + return output[0:i] + } + } return f @@ -51,10 +63,10 @@ func SeqToSliceWorker(worker SeqWorker, inplace bool) SeqSliceWorker { func SeqToSliceConditionalWorker(worker SeqWorker, condition SequencePredicate, - inplace bool) SeqSliceWorker { + inplace, breakOnError bool) SeqSliceWorker { if condition == nil { - return SeqToSliceWorker(worker,inplace) + return SeqToSliceWorker(worker, inplace, breakOnError) } f := func(input BioSequenceSlice) BioSequenceSlice { @@ -62,15 +74,23 @@ func SeqToSliceConditionalWorker(worker SeqWorker, if !inplace { output = MakeBioSequenceSlice(len(input)) } - for i, s := range input { + + i := 0 + + for _, s := range input { if condition(s) { - output[i] = worker(s) - } else { - output[i] = s + r := worker(s) + if r != nil { + output[i] = r + i++ + } else if breakOnError { + log.Fatalf("got an error on sequence %s processing", + r.Id()) + } } } - return output + return output[0:i] } return f @@ -83,9 +103,12 @@ func (worker SeqWorker) ChainWorkers(next SeqWorker) SeqWorker { if next == nil { return worker } - } + } f := func(seq *BioSequence) *BioSequence { + if seq == nil { + return nil + } return next(worker(seq)) } diff --git a/pkg/obitools/obiannotate/obiannotate.go b/pkg/obitools/obiannotate/obiannotate.go index 7f19984..66ee9f3 100644 --- a/pkg/obitools/obiannotate/obiannotate.go +++ b/pkg/obitools/obiannotate/obiannotate.go @@ -1,7 +1,7 @@ package obiannotate import ( - "log" + log "github.com/sirupsen/logrus" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obicorazick" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" @@ -22,6 +22,15 @@ func DeleteAttributesWorker(toBeDeleted []string) obiseq.SeqWorker { return f } +// func MatchPatternWorker(pattern string, errormax int, allowsIndel bool) obiseq.SeqWorker { +// pat, err := obiapat.MakeApatPattern(pattern, errormax, allowsIndel) +// f := func(s *obiseq.BioSequence) *obiseq.BioSequence { +// apats := obiapat.MakeApatSequence(s, false) +// pat.BestMatch(apats, 0) +// return s +// } +// } + func ToBeKeptAttributesWorker(toBeKept []string) obiseq.SeqWorker { d := make(map[string]bool, len(_keepOnly)) @@ -43,6 +52,58 @@ func ToBeKeptAttributesWorker(toBeKept []string) obiseq.SeqWorker { return f } +func CutSequenceWorker(from, to int, breakOnError bool) obiseq.SeqWorker { + + f := func(s *obiseq.BioSequence) *obiseq.BioSequence { + var f, t int + + switch { + case from < 0: + f = s.Len() + from + 1 + case from > 0: + f = from + } + + switch { + case to < 0: + t = s.Len() + to + 1 + case to > 0: + t = to + } + + if from < 0 { + from = 0 + } + + if to >= s.Len() { + to = s.Len() + } + + rep, err := s.Subsequence(f, t, false) + if err != nil { + if breakOnError { + log.Fatalf("Cannot cut sequence %s (%v)", s.Id(), err) + } else { + log.Warnf("Cannot cut sequence %s (%v), sequence discarded", s.Id(), err) + return nil + } + } + return rep + } + + if from == 0 && to == 0 { + f = func(s *obiseq.BioSequence) *obiseq.BioSequence { + return s + } + } + + if from > 0 { + from-- + } + + return f +} + func ClearAllAttributesWorker() obiseq.SeqWorker { f := func(s *obiseq.BioSequence) *obiseq.BioSequence { annot := s.Annotations() @@ -81,7 +142,6 @@ func EvalAttributeWorker(expression map[string]string) obiseq.SeqWorker { return w } - func AddTaxonAtRankWorker(taxonomy *obitax.Taxonomy, ranks ...string) obiseq.SeqWorker { f := func(s *obiseq.BioSequence) *obiseq.BioSequence { for _, r := range ranks { @@ -162,6 +222,13 @@ func CLIAnnotationWorker() obiseq.SeqWorker { annotator = annotator.ChainWorkers(w) } + if CLIHasCut() { + from, to := CLICut() + w := CutSequenceWorker(from, to, false) + + annotator = annotator.ChainWorkers(w) + } + return annotator } @@ -170,7 +237,7 @@ func CLIAnnotationPipeline() obiiter.Pipeable { predicate := obigrep.CLISequenceSelectionPredicate() worker := CLIAnnotationWorker() - annotator := obiseq.SeqToSliceConditionalWorker(worker, predicate, true) + annotator := obiseq.SeqToSliceConditionalWorker(worker, predicate, true, false) f := obiiter.SliceWorkerPipe(annotator, obioptions.CLIParallelWorkers()) return f diff --git a/pkg/obitools/obiannotate/options.go b/pkg/obitools/obiannotate/options.go index 8132be0..8737999 100644 --- a/pkg/obitools/obiannotate/options.go +++ b/pkg/obitools/obiannotate/options.go @@ -1,11 +1,12 @@ package obiannotate import ( - "io/ioutil" - "log" "os" + "strconv" "strings" + log "github.com/sirupsen/logrus" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obigrep" "github.com/DavidGamba/go-getoptions" @@ -22,9 +23,11 @@ var _clearAll = false var _setSeqLength = false var _uniqueID = false var _ahoCorazick = "" +var _pattern = "" var _lcaSlot = "" var _lcaError = 0.0 var _setId = "" +var _cut = "" func SequenceAnnotationOptionSet(options *getoptions.GetOpt) { // options.BoolVar(&_addRank, "seq-rank", _addRank, @@ -42,6 +45,13 @@ func SequenceAnnotationOptionSet(options *getoptions.GetOpt) { options.StringVar(&_ahoCorazick, "aho-corasick", _ahoCorazick, options.Description("Adds an aho-corasick attribut with the count of matches of the provided patterns.")) + options.StringVar(&_pattern, "pattern", _pattern, + options.Description("Adds a pattern attribut containing the pattern, a pattern_match slot "+ + "indicating the matched sequence, "+ + "and a pattern_error slot indicating the number difference between the pattern and the match "+ + "to the sequence.", + )) + options.StringVar(&_lcaSlot, "add-lca-in", _lcaSlot, options.ArgName("SLOT_NAME"), options.Description("From the taxonomic annotation of the sequence (taxid slot or merged_taxid slot), "+ @@ -59,6 +69,10 @@ func SequenceAnnotationOptionSet(options *getoptions.GetOpt) { "estimated LCA."), ) + options.StringVar(&_cut, "cut", _cut, + options.ArgName("###:###"), + options.Description("A pattern decribing how to cut the sequence")) + // options.BoolVar(&_uniqueID, "uniq-id", _uniqueID, // options.Description("Forces sequence record ids to be unique."), // ) @@ -133,10 +147,9 @@ func CLIHasSetId() bool { } func CLSetIdExpression() string { - return _setId + return _setId } - func CLIHasAttributeToBeRenamed() bool { return len(_toBeRenamed) > 0 } @@ -191,7 +204,7 @@ func CLIHasAhoCorasick() bool { } func CLIAhoCorazick() []string { - content, err := ioutil.ReadFile(_ahoCorazick) + content, err := os.ReadFile(_ahoCorazick) if err != nil { log.Fatalln("Cannot open file ", _ahoCorazick) } @@ -221,3 +234,33 @@ func CLIHasAddLCA() bool { func CLILCAThreshold() float64 { return 1 - _lcaError } + +func CLICut() (int, int) { + if _cut == "" { + return 0, 0 + } + values := strings.Split(_cut, ":") + + if len(values) != 2 { + log.Fatalf("Invalid cut value %s. value should be of the form start:end", _cut) + } + + start, err := strconv.Atoi(values[0]) + + if err != nil { + log.Fatalf("Invalid cut value %s. value %s should be an integer", _cut, values[0]) + } + end, err := strconv.Atoi(values[1]) + + if err != nil { + log.Fatalf("Invalid cut value %s. value %s should be an integer", _cut, values[1]) + } + + return start, end +} + +func CLIHasCut() bool { + f, t := CLICut() + + return f != 0 && t != 0 +} diff --git a/pkg/obitools/obisummary/obisummary.go b/pkg/obitools/obisummary/obisummary.go index 0b0eb82..71373dc 100644 --- a/pkg/obitools/obisummary/obisummary.go +++ b/pkg/obitools/obisummary/obisummary.go @@ -23,6 +23,7 @@ type DataSummary struct { sample_variants map[string]int sample_singletons map[string]int sample_obiclean_bad map[string]int + map_summaries map[string]map[string]int } func NewDataSummary() *DataSummary { @@ -40,6 +41,7 @@ func NewDataSummary() *DataSummary { sample_variants: make(map[string]int), sample_singletons: make(map[string]int), sample_obiclean_bad: make(map[string]int), + map_summaries: make(map[string]map[string]int), } } @@ -150,13 +152,19 @@ func (data *DataSummary) Update(s *obiseq.BioSequence) *DataSummary { return data } -func ISummary(iterator obiiter.IBioSequence) map[string]interface{} { +func ISummary(iterator obiiter.IBioSequence, summarise []string) map[string]interface{} { nproc := obioptions.CLIParallelWorkers() waiter := sync.WaitGroup{} summaries := make([]*DataSummary, nproc) + for n := 0; n < nproc; n++ { + for _, v := range summarise { + summaries[n].map_summaries[v] = make(map[string]int, 0) + } + } + ff := func(iseq obiiter.IBioSequence, summary *DataSummary) { for iseq.Next() { diff --git a/pkg/obitools/obisummary/options.go b/pkg/obitools/obisummary/options.go index c9b8b85..609c677 100644 --- a/pkg/obitools/obisummary/options.go +++ b/pkg/obitools/obisummary/options.go @@ -11,6 +11,7 @@ import ( var __json_output__ = false var __yaml_output__ = false +var __map_summary__ = make([]string, 0) func SummaryOptionSet(options *getoptions.GetOpt) { options.BoolVar(&__json_output__, "json-output", false, @@ -18,6 +19,9 @@ func SummaryOptionSet(options *getoptions.GetOpt) { options.BoolVar(&__yaml_output__, "yaml-output", false, options.Description("Print results as YAML record.")) + + options.StringSliceVar(&__map_summary__, "map", 1, 1, + options.Description("Name of a map attribute.")) } func OptionSet(options *getoptions.GetOpt) { @@ -32,3 +36,11 @@ func CLIOutFormat() string { return "json" } + +func CLIHasMapSummary() bool { + return len(__map_summary__) > 0 +} + +func CLIMapSummary() []string { + return __map_summary__ +}