From 84b3e4d0979e238691ebffe4f6232b9e5b6a69a9 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 31 Mar 2023 10:53:53 +0200 Subject: [PATCH] Reduce memomry inprint of obipcr Former-commit-id: bd25be2d454f083c729346a828e27f07ad1a216e --- cmd/obitools/obipcr/main.go | 5 ++- pkg/obiiter/fragment.go | 72 ++++++++++++++++++++++++++++++++++ pkg/obioptions/options.go | 28 ++++++++----- pkg/obitools/obipcr/options.go | 41 ++++++++++++------- pkg/obitools/obipcr/pcr.go | 43 ++++++++++++++------ 5 files changed, 149 insertions(+), 40 deletions(-) create mode 100644 pkg/obiiter/fragment.go diff --git a/cmd/obitools/obipcr/main.go b/cmd/obitools/obipcr/main.go index d925d4f..bd86d6c 100644 --- a/cmd/obitools/obipcr/main.go +++ b/cmd/obitools/obipcr/main.go @@ -5,7 +5,6 @@ import ( log "github.com/sirupsen/logrus" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" @@ -26,6 +25,8 @@ func main() { // trace.Start(ftrace) // defer trace.Stop() + obioptions.SetWorkerPerCore(1) + optionParser := obioptions.GenerateOptionParser(obipcr.OptionSet) _, args := optionParser(os.Args) @@ -37,7 +38,7 @@ func main() { os.Exit(1) } - amplicons, _ := obipcr.PCR(sequences) + amplicons, _ := obipcr.CLIPCR(sequences) obiconvert.CLIWriteBioSequences(amplicons, true) obiiter.WaitForLastPipe() diff --git a/pkg/obiiter/fragment.go b/pkg/obiiter/fragment.go new file mode 100644 index 0000000..3857c3b --- /dev/null +++ b/pkg/obiiter/fragment.go @@ -0,0 +1,72 @@ +package obiiter + +import ( + log "github.com/sirupsen/logrus" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" +) + +func IFragments(minsize, length, overlap, size, nworkers int) Pipeable { + step := length - overlap + + ifrg := func(iterator IBioSequence) IBioSequence { + order := obiutils.AtomicCounter() + newiter := MakeIBioSequence() + iterator = iterator.SortBatches() + + newiter.Add(nworkers) + + go func() { + newiter.WaitAndClose() + }() + + f := func(iterator IBioSequence, id int) { + news := obiseq.MakeBioSequenceSlice() + for iterator.Next() { + sl := iterator.Get() + for _, s := range sl.Slice() { + if s.Len() <= minsize { + news = append(news, s) + } else { + for i := 0; i < s.Len(); i += step { + end := obiutils.MinInt(i+length, s.Len()) + frg, err := s.Subsequence(i, end, false) + if err != nil { + log.Panicln(err) + } + news = append(news, frg) + if len(news) >= size { + newiter.Push(MakeBioSequenceBatch(order(), news)) + news = obiseq.MakeBioSequenceSlice() + } + } + s.Recycle() + } + if len(news) >= size { + o := order() + newiter.Push(MakeBioSequenceBatch(o, news)) + news = obiseq.MakeBioSequenceSlice() + } + } // End of the slice loop + sl.Recycle(false) + } // End of the iterator loop + + if len(news) > 0 { + newiter.Push(MakeBioSequenceBatch(order(), news)) + } + + newiter.Done() + + } + + for i := 1; i < nworkers; i++ { + go f(iterator.Split(), i) + } + go f(iterator, 0) + + return newiter + } + + return ifrg +} diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index 66077e8..ce29519 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -11,11 +11,10 @@ import ( "net/http" _ "net/http/pprof" - ) var _Debug = false -var _ParallelWorkers = runtime.NumCPU()*2 - 1 +var _WorkerPerCore = 2 var _MaxAllowedCPU = runtime.NumCPU() var _BatchSize = 5000 var _Pprof = false @@ -31,9 +30,9 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser options.BoolVar(&_Debug, "debug", false) options.BoolVar(&_Pprof, "pprof", false) - options.IntVar(&_ParallelWorkers, "workers", _ParallelWorkers, - options.Alias("w"), - options.Description("Number of parallele threads computing the result")) + // options.IntVar(&_ParallelWorkers, "workers", _ParallelWorkers, + // options.Alias("w"), + // options.Description("Number of parallele threads computing the result")) options.IntVar(&_MaxAllowedCPU, "max-cpu", _MaxAllowedCPU, options.GetEnv("OBIMAXCPU"), @@ -74,16 +73,13 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser runtime.GOMAXPROCS(_MaxAllowedCPU) if options.Called("max-cpu") { log.Printf("CPU number limited to %d", _MaxAllowedCPU) - if !options.Called("workers") { - _ParallelWorkers = _MaxAllowedCPU*2 - 1 - log.Printf("Number of workers set %d", _ParallelWorkers) - } } if options.Called("no-singleton") { log.Printf("No singleton option set") } + log.Printf("Number of workers set %d", CLIParallelWorkers()) return options, remaining } } @@ -96,7 +92,7 @@ func CLIIsDebugMode() bool { // CLIParallelWorkers returns the number of parallel workers requested by // the command line option --workers|-w. func CLIParallelWorkers() int { - return _ParallelWorkers + return _MaxAllowedCPU * _WorkerPerCore } // CLIParallelWorkers returns the number of parallel workers requested by @@ -119,3 +115,15 @@ func DebugOn() { func DebugOff() { _Debug = false } + +func SetWorkerPerCore(n int) { + _WorkerPerCore = n +} + +func WorkerPerCore() int { + return _WorkerPerCore +} + +func SetBatchSize(n int) { + _BatchSize = n +} diff --git a/pkg/obitools/obipcr/options.go b/pkg/obitools/obipcr/options.go index 0d5fc0a..edf380d 100644 --- a/pkg/obitools/obipcr/options.go +++ b/pkg/obitools/obipcr/options.go @@ -1,3 +1,5 @@ +// It adds to a CLI every options proposed to the user to tune the parametters of the PCR simulation +// algorithm package obipcr import ( @@ -14,13 +16,14 @@ var _ReversePrimer string var _AllowedMismatch = 0 var _MinimumLength = 0 var _MaximumLength = -1 +var _Fragmented = false // PCROptionSet defines every options related to a simulated PCR. // // The function adds to a CLI every options proposed to the user // to tune the parametters of the PCR simulation algorithm. // -// Parameters +// # Parameters // // - option : is a pointer to a getoptions.GetOpt instance normaly // produced by the @@ -29,6 +32,9 @@ func PCROptionSet(options *getoptions.GetOpt) { options.Alias("c"), options.Description("Considers that sequences are [c]ircular.")) + options.BoolVar(&_Fragmented, "fragmented", false, + options.Description("Fragaments long sequences in overlaping fragments to speedup computations")) + options.StringVar(&_ForwardPrimer, "forward", "", options.Required("You must provide a forward primer"), options.Description("The forward primer used for the electronic PCR.")) @@ -46,6 +52,7 @@ func PCROptionSet(options *getoptions.GetOpt) { options.Description("Minimum length of the barcode (primers excluded).")) options.IntVar(&_MaximumLength, "max-length", -1, options.Alias("L"), + options.Required("You must indicate the maximum size of the amplicon (excluded primer length)"), options.Description("Maximum length of the barcode (primers excluded).")) } @@ -56,11 +63,11 @@ func OptionSet(options *getoptions.GetOpt) { PCROptionSet(options) } -// ForwardPrimer returns the sequence of the forward primer as indicated by the +// CLIForwardPrimer returns the sequence of the forward primer as indicated by the // --forward command line option -func ForwardPrimer() string { +func CLIForwardPrimer() string { pattern, err := obiapat.MakeApatPattern(_ForwardPrimer, _AllowedMismatch, false) - + if err != nil { log.Fatalf("%+v", err) } @@ -70,10 +77,10 @@ func ForwardPrimer() string { return _ForwardPrimer } -// ReversePrimer returns the sequence of the reverse primer as indicated by the +// CLIReversePrimer returns the sequence of the reverse primer as indicated by the // --reverse command line option -func ReversePrimer() string { - pattern, err := obiapat.MakeApatPattern(_ReversePrimer, _AllowedMismatch,false) +func CLIReversePrimer() string { + pattern, err := obiapat.MakeApatPattern(_ReversePrimer, _AllowedMismatch, false) if err != nil { log.Fatalf("%+v", err) @@ -84,27 +91,31 @@ func ReversePrimer() string { return _ReversePrimer } -// AllowedMismatch returns the allowed mistmatch count between each +// CLIAllowedMismatch returns the allowed mistmatch count between each // primer and the sequences as indicated by the // --allowed-mismatches|-e command line option -func AllowedMismatch() int { +func CLIAllowedMismatch() int { return _AllowedMismatch } -// Circular returns the considered sequence topology as indicated by the +// CLICircular returns the considered sequence topology as indicated by the // --circular|-c command line option -func Circular() bool { +func CLICircular() bool { return _Circular } -// MinLength returns the amplicon minimum length as indicated by the +// CLIMinLength returns the amplicon minimum length as indicated by the // --min-length|-l command line option -func MinLength() int { +func CLIMinLength() int { return _MinimumLength } -// MaxLength returns the amplicon maximum length as indicated by the +// CLIMaxLength returns the amplicon maximum length as indicated by the // --max-length|-L command line option -func MaxLength() int { +func CLIMaxLength() int { return _MaximumLength } + +func CLIFragmented() bool { + return _Fragmented +} diff --git a/pkg/obitools/obipcr/pcr.go b/pkg/obitools/obipcr/pcr.go index 83f7902..72ff109 100644 --- a/pkg/obitools/obipcr/pcr.go +++ b/pkg/obitools/obipcr/pcr.go @@ -4,39 +4,56 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiapat" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" + log "github.com/sirupsen/logrus" ) -// PCR iterates over sequences provided by a obiseq.IBioSequenceBatch +// CLIPCR iterates over sequences provided by a obiseq.IBioSequenceBatch // and returns an other obiseq.IBioSequenceBatch distributing // obiseq.BioSequenceBatch containing the selected amplicon sequences. -func PCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) { +func CLIPCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) { opts := make([]obiapat.WithOption, 0, 10) opts = append(opts, obiapat.OptionForwardPrimer( - ForwardPrimer(), - AllowedMismatch(), + CLIForwardPrimer(), + CLIAllowedMismatch(), ), obiapat.OptionReversePrimer( - ReversePrimer(), - AllowedMismatch(), + CLIReversePrimer(), + CLIAllowedMismatch(), ), ) - if MinLength() > 0 { - opts = append(opts, obiapat.OptionMinLength(MinLength())) + if CLIMinLength() > 0 { + opts = append(opts, obiapat.OptionMinLength(CLIMinLength())) } - if MaxLength() > 0 { - opts = append(opts, obiapat.OptionMaxLength(MaxLength())) - } + opts = append(opts, obiapat.OptionMaxLength(CLIMaxLength())) - if Circular() { - opts = append(opts, obiapat.OptionCircular(Circular())) + if CLICircular() { + opts = append(opts, obiapat.OptionCircular(CLICircular())) } worker := obiapat.PCRSliceWorker(opts...) + if CLIFragmented() { + frags := obiiter.IFragments( + CLIMaxLength()*1000, + CLIMaxLength()*100, + CLIMaxLength()+obiutils.MaxInt(len(CLIForwardPrimer()), + len(CLIReversePrimer()))+obiutils.MinInt(len(CLIForwardPrimer()), + len(CLIReversePrimer()))/2, + 100, + obioptions.CLIParallelWorkers(), + ) + log.Infof("Fragmenting sequence longer than %dbp into chuncks of %dbp", + CLIMaxLength()*100000000, + CLIMaxLength()*100, + ) + iterator = iterator.Pipe(frags) + } + return iterator.MakeISliceWorker(worker, obioptions.CLIParallelWorkers(), 0), nil }