Correct the number of workers

Former-commit-id: febbccfb853263e0761ecfccb0f09c8c1bf88475
This commit is contained in:
2023-11-22 09:46:30 +01:00
parent 8905a16bc0
commit 2e0c1bd801
11 changed files with 206 additions and 39 deletions

View File

@ -45,7 +45,7 @@ func main() {
os.Exit(1)
}
summary := obisummary.ISummary(fs)
summary := obisummary.ISummary(fs, obisummary.CLIMapSummary())
if obisummary.CLIOutFormat() == "json" {
output, _ := json.MarshalIndent(summary, "", " ")

View File

@ -7,6 +7,7 @@ import (
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
@ -60,8 +61,8 @@ func ISequenceSubChunk(iterator obiiter.IBioSequence,
classifier *obiseq.BioSequenceClassifier,
nworkers int) (obiiter.IBioSequence, error) {
if nworkers <=0 {
nworkers = 4
if nworkers <= 0 {
nworkers = obioptions.CLIParallelWorkers()
}
newIter := obiiter.MakeIBioSequence()

View File

@ -10,6 +10,7 @@ import (
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils"
"github.com/tevino/abool/v2"
@ -566,7 +567,7 @@ func (iterator IBioSequence) DivideOn(predicate obiseq.SequencePredicate,
// A function that takes a predicate and a batch of sequences and returns a filtered batch of sequences.
func (iterator IBioSequence) FilterOn(predicate obiseq.SequencePredicate,
size int, sizes ...int) IBioSequence {
nworkers := 4
nworkers := obioptions.CLIReadParallelWorkers()
if len(sizes) > 0 {
nworkers = sizes[0]
@ -618,7 +619,7 @@ func (iterator IBioSequence) FilterOn(predicate obiseq.SequencePredicate,
func (iterator IBioSequence) FilterAnd(predicate obiseq.SequencePredicate,
size int, sizes ...int) IBioSequence {
nworkers := 4
nworkers := obioptions.CLIReadParallelWorkers()
if len(sizes) > 0 {
nworkers = sizes[0]

View File

@ -58,7 +58,7 @@ func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int)
func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePredicate,
worker obiseq.SeqWorker, sizes ...int) IBioSequence {
nworkers := 4
nworkers := obioptions.CLIReadParallelWorkers()
if len(sizes) > 0 {
nworkers = sizes[0]
@ -101,7 +101,7 @@ func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePre
}
func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, sizes ...int) IBioSequence {
nworkers := 4
nworkers := obioptions.CLIParallelWorkers()
if len(sizes) > 0 {
nworkers = sizes[0]
@ -119,7 +119,11 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, size
f := func(iterator IBioSequence) {
for iterator.Next() {
batch := iterator.Get()
bs := len(batch.slice)
batch.slice = worker(batch.slice)
if bs != len(batch.slice) {
log.Warnf("Input size : %d output %d", bs, len(batch.slice))
}
newIter.Push(batch)
}
newIter.Done()

View File

@ -239,6 +239,9 @@ func (s *BioSequence) String() string {
// It does not take any parameters.
// It returns an integer representing the length of the sequence.
func (s *BioSequence) Len() int {
if s == nil {
return 0
}
return len(s.sequence)
}

View File

@ -1,7 +1,6 @@
package obiseq
import (
"errors"
"fmt"
)
@ -17,15 +16,21 @@ import (
// - error: an error if the subsequence parameters are invalid.
func (sequence *BioSequence) Subsequence(from, to int, circular bool) (*BioSequence, error) {
if from >= to && !circular {
return nil, errors.New("from greater than to")
return nil, fmt.Errorf("from: %d greater than to: %d", from, to)
}
if from < 0 || from >= sequence.Len() {
return nil, errors.New("from out of bounds")
if from < 0 {
return nil, fmt.Errorf("from out of bounds %d < 0", from)
}
if to <= 0 || to > sequence.Len() {
return nil, errors.New("to out of bounds")
if from >= sequence.Len() {
return nil,
fmt.Errorf("from out of bounds %d >= %d", from, sequence.Len())
}
if to > sequence.Len() {
return nil,
fmt.Errorf("to out of bounds %d > %d", to, sequence.Len())
}
var newSeq *BioSequence

View File

@ -1,5 +1,7 @@
package obiseq
import log "github.com/sirupsen/logrus"
type SeqAnnotator func(*BioSequence)
type SeqWorker func(*BioSequence) *BioSequence
@ -17,7 +19,8 @@ func AnnotatorToSeqWorker(function SeqAnnotator) SeqWorker {
return f
}
func SeqToSliceWorker(worker SeqWorker, inplace bool) SeqSliceWorker {
func SeqToSliceWorker(worker SeqWorker,
inplace, breakOnError bool) SeqSliceWorker {
var f SeqSliceWorker
if worker == nil {
@ -25,12 +28,12 @@ func SeqToSliceWorker(worker SeqWorker, inplace bool) SeqSliceWorker {
f = func(input BioSequenceSlice) BioSequenceSlice {
return input
}
} else {
} else {
f = func(input BioSequenceSlice) BioSequenceSlice {
output := MakeBioSequenceSlice(len(input))
copy(output,input)
copy(output, input)
return output
}
}
}
} else {
f = func(input BioSequenceSlice) BioSequenceSlice {
@ -38,12 +41,21 @@ func SeqToSliceWorker(worker SeqWorker, inplace bool) SeqSliceWorker {
if !inplace {
output = MakeBioSequenceSlice(len(input))
}
for i, s := range input {
output[i] = worker(s)
i := 0
for _, s := range input {
r := worker(s)
if r != nil {
output[i] = r
i++
} else if breakOnError {
log.Fatalf("got an error on sequence %s processing",
r.Id())
}
}
return output
}
return output[0:i]
}
}
return f
@ -51,10 +63,10 @@ func SeqToSliceWorker(worker SeqWorker, inplace bool) SeqSliceWorker {
func SeqToSliceConditionalWorker(worker SeqWorker,
condition SequencePredicate,
inplace bool) SeqSliceWorker {
inplace, breakOnError bool) SeqSliceWorker {
if condition == nil {
return SeqToSliceWorker(worker,inplace)
return SeqToSliceWorker(worker, inplace, breakOnError)
}
f := func(input BioSequenceSlice) BioSequenceSlice {
@ -62,15 +74,23 @@ func SeqToSliceConditionalWorker(worker SeqWorker,
if !inplace {
output = MakeBioSequenceSlice(len(input))
}
for i, s := range input {
i := 0
for _, s := range input {
if condition(s) {
output[i] = worker(s)
} else {
output[i] = s
r := worker(s)
if r != nil {
output[i] = r
i++
} else if breakOnError {
log.Fatalf("got an error on sequence %s processing",
r.Id())
}
}
}
return output
return output[0:i]
}
return f
@ -83,9 +103,12 @@ func (worker SeqWorker) ChainWorkers(next SeqWorker) SeqWorker {
if next == nil {
return worker
}
}
}
f := func(seq *BioSequence) *BioSequence {
if seq == nil {
return nil
}
return next(worker(seq))
}

View File

@ -1,7 +1,7 @@
package obiannotate
import (
"log"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obicorazick"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
@ -22,6 +22,15 @@ func DeleteAttributesWorker(toBeDeleted []string) obiseq.SeqWorker {
return f
}
// func MatchPatternWorker(pattern string, errormax int, allowsIndel bool) obiseq.SeqWorker {
// pat, err := obiapat.MakeApatPattern(pattern, errormax, allowsIndel)
// f := func(s *obiseq.BioSequence) *obiseq.BioSequence {
// apats := obiapat.MakeApatSequence(s, false)
// pat.BestMatch(apats, 0)
// return s
// }
// }
func ToBeKeptAttributesWorker(toBeKept []string) obiseq.SeqWorker {
d := make(map[string]bool, len(_keepOnly))
@ -43,6 +52,58 @@ func ToBeKeptAttributesWorker(toBeKept []string) obiseq.SeqWorker {
return f
}
func CutSequenceWorker(from, to int, breakOnError bool) obiseq.SeqWorker {
f := func(s *obiseq.BioSequence) *obiseq.BioSequence {
var f, t int
switch {
case from < 0:
f = s.Len() + from + 1
case from > 0:
f = from
}
switch {
case to < 0:
t = s.Len() + to + 1
case to > 0:
t = to
}
if from < 0 {
from = 0
}
if to >= s.Len() {
to = s.Len()
}
rep, err := s.Subsequence(f, t, false)
if err != nil {
if breakOnError {
log.Fatalf("Cannot cut sequence %s (%v)", s.Id(), err)
} else {
log.Warnf("Cannot cut sequence %s (%v), sequence discarded", s.Id(), err)
return nil
}
}
return rep
}
if from == 0 && to == 0 {
f = func(s *obiseq.BioSequence) *obiseq.BioSequence {
return s
}
}
if from > 0 {
from--
}
return f
}
func ClearAllAttributesWorker() obiseq.SeqWorker {
f := func(s *obiseq.BioSequence) *obiseq.BioSequence {
annot := s.Annotations()
@ -81,7 +142,6 @@ func EvalAttributeWorker(expression map[string]string) obiseq.SeqWorker {
return w
}
func AddTaxonAtRankWorker(taxonomy *obitax.Taxonomy, ranks ...string) obiseq.SeqWorker {
f := func(s *obiseq.BioSequence) *obiseq.BioSequence {
for _, r := range ranks {
@ -162,6 +222,13 @@ func CLIAnnotationWorker() obiseq.SeqWorker {
annotator = annotator.ChainWorkers(w)
}
if CLIHasCut() {
from, to := CLICut()
w := CutSequenceWorker(from, to, false)
annotator = annotator.ChainWorkers(w)
}
return annotator
}
@ -170,7 +237,7 @@ func CLIAnnotationPipeline() obiiter.Pipeable {
predicate := obigrep.CLISequenceSelectionPredicate()
worker := CLIAnnotationWorker()
annotator := obiseq.SeqToSliceConditionalWorker(worker, predicate, true)
annotator := obiseq.SeqToSliceConditionalWorker(worker, predicate, true, false)
f := obiiter.SliceWorkerPipe(annotator, obioptions.CLIParallelWorkers())
return f

View File

@ -1,11 +1,12 @@
package obiannotate
import (
"io/ioutil"
"log"
"os"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obigrep"
"github.com/DavidGamba/go-getoptions"
@ -22,9 +23,11 @@ var _clearAll = false
var _setSeqLength = false
var _uniqueID = false
var _ahoCorazick = ""
var _pattern = ""
var _lcaSlot = ""
var _lcaError = 0.0
var _setId = ""
var _cut = ""
func SequenceAnnotationOptionSet(options *getoptions.GetOpt) {
// options.BoolVar(&_addRank, "seq-rank", _addRank,
@ -42,6 +45,13 @@ func SequenceAnnotationOptionSet(options *getoptions.GetOpt) {
options.StringVar(&_ahoCorazick, "aho-corasick", _ahoCorazick,
options.Description("Adds an aho-corasick attribut with the count of matches of the provided patterns."))
options.StringVar(&_pattern, "pattern", _pattern,
options.Description("Adds a pattern attribut containing the pattern, a pattern_match slot "+
"indicating the matched sequence, "+
"and a pattern_error slot indicating the number difference between the pattern and the match "+
"to the sequence.",
))
options.StringVar(&_lcaSlot, "add-lca-in", _lcaSlot,
options.ArgName("SLOT_NAME"),
options.Description("From the taxonomic annotation of the sequence (taxid slot or merged_taxid slot), "+
@ -59,6 +69,10 @@ func SequenceAnnotationOptionSet(options *getoptions.GetOpt) {
"estimated LCA."),
)
options.StringVar(&_cut, "cut", _cut,
options.ArgName("###:###"),
options.Description("A pattern decribing how to cut the sequence"))
// options.BoolVar(&_uniqueID, "uniq-id", _uniqueID,
// options.Description("Forces sequence record ids to be unique."),
// )
@ -133,10 +147,9 @@ func CLIHasSetId() bool {
}
func CLSetIdExpression() string {
return _setId
return _setId
}
func CLIHasAttributeToBeRenamed() bool {
return len(_toBeRenamed) > 0
}
@ -191,7 +204,7 @@ func CLIHasAhoCorasick() bool {
}
func CLIAhoCorazick() []string {
content, err := ioutil.ReadFile(_ahoCorazick)
content, err := os.ReadFile(_ahoCorazick)
if err != nil {
log.Fatalln("Cannot open file ", _ahoCorazick)
}
@ -221,3 +234,33 @@ func CLIHasAddLCA() bool {
func CLILCAThreshold() float64 {
return 1 - _lcaError
}
func CLICut() (int, int) {
if _cut == "" {
return 0, 0
}
values := strings.Split(_cut, ":")
if len(values) != 2 {
log.Fatalf("Invalid cut value %s. value should be of the form start:end", _cut)
}
start, err := strconv.Atoi(values[0])
if err != nil {
log.Fatalf("Invalid cut value %s. value %s should be an integer", _cut, values[0])
}
end, err := strconv.Atoi(values[1])
if err != nil {
log.Fatalf("Invalid cut value %s. value %s should be an integer", _cut, values[1])
}
return start, end
}
func CLIHasCut() bool {
f, t := CLICut()
return f != 0 && t != 0
}

View File

@ -23,6 +23,7 @@ type DataSummary struct {
sample_variants map[string]int
sample_singletons map[string]int
sample_obiclean_bad map[string]int
map_summaries map[string]map[string]int
}
func NewDataSummary() *DataSummary {
@ -40,6 +41,7 @@ func NewDataSummary() *DataSummary {
sample_variants: make(map[string]int),
sample_singletons: make(map[string]int),
sample_obiclean_bad: make(map[string]int),
map_summaries: make(map[string]map[string]int),
}
}
@ -150,13 +152,19 @@ func (data *DataSummary) Update(s *obiseq.BioSequence) *DataSummary {
return data
}
func ISummary(iterator obiiter.IBioSequence) map[string]interface{} {
func ISummary(iterator obiiter.IBioSequence, summarise []string) map[string]interface{} {
nproc := obioptions.CLIParallelWorkers()
waiter := sync.WaitGroup{}
summaries := make([]*DataSummary, nproc)
for n := 0; n < nproc; n++ {
for _, v := range summarise {
summaries[n].map_summaries[v] = make(map[string]int, 0)
}
}
ff := func(iseq obiiter.IBioSequence, summary *DataSummary) {
for iseq.Next() {

View File

@ -11,6 +11,7 @@ import (
var __json_output__ = false
var __yaml_output__ = false
var __map_summary__ = make([]string, 0)
func SummaryOptionSet(options *getoptions.GetOpt) {
options.BoolVar(&__json_output__, "json-output", false,
@ -18,6 +19,9 @@ func SummaryOptionSet(options *getoptions.GetOpt) {
options.BoolVar(&__yaml_output__, "yaml-output", false,
options.Description("Print results as YAML record."))
options.StringSliceVar(&__map_summary__, "map", 1, 1,
options.Description("Name of a map attribute."))
}
func OptionSet(options *getoptions.GetOpt) {
@ -32,3 +36,11 @@ func CLIOutFormat() string {
return "json"
}
func CLIHasMapSummary() bool {
return len(__map_summary__) > 0
}
func CLIMapSummary() []string {
return __map_summary__
}