Reduce memomry inprint of obipcr

Former-commit-id: bd25be2d454f083c729346a828e27f07ad1a216e
This commit is contained in:
2023-03-31 10:53:53 +02:00
parent 2c9bca5c8a
commit 84b3e4d097
5 changed files with 149 additions and 40 deletions

View File

@ -5,7 +5,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
@ -26,6 +25,8 @@ func main() {
// trace.Start(ftrace) // trace.Start(ftrace)
// defer trace.Stop() // defer trace.Stop()
obioptions.SetWorkerPerCore(1)
optionParser := obioptions.GenerateOptionParser(obipcr.OptionSet) optionParser := obioptions.GenerateOptionParser(obipcr.OptionSet)
_, args := optionParser(os.Args) _, args := optionParser(os.Args)
@ -37,7 +38,7 @@ func main() {
os.Exit(1) os.Exit(1)
} }
amplicons, _ := obipcr.PCR(sequences) amplicons, _ := obipcr.CLIPCR(sequences)
obiconvert.CLIWriteBioSequences(amplicons, true) obiconvert.CLIWriteBioSequences(amplicons, true)
obiiter.WaitForLastPipe() obiiter.WaitForLastPipe()

72
pkg/obiiter/fragment.go Normal file
View File

@ -0,0 +1,72 @@
package obiiter
import (
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils"
)
func IFragments(minsize, length, overlap, size, nworkers int) Pipeable {
step := length - overlap
ifrg := func(iterator IBioSequence) IBioSequence {
order := obiutils.AtomicCounter()
newiter := MakeIBioSequence()
iterator = iterator.SortBatches()
newiter.Add(nworkers)
go func() {
newiter.WaitAndClose()
}()
f := func(iterator IBioSequence, id int) {
news := obiseq.MakeBioSequenceSlice()
for iterator.Next() {
sl := iterator.Get()
for _, s := range sl.Slice() {
if s.Len() <= minsize {
news = append(news, s)
} else {
for i := 0; i < s.Len(); i += step {
end := obiutils.MinInt(i+length, s.Len())
frg, err := s.Subsequence(i, end, false)
if err != nil {
log.Panicln(err)
}
news = append(news, frg)
if len(news) >= size {
newiter.Push(MakeBioSequenceBatch(order(), news))
news = obiseq.MakeBioSequenceSlice()
}
}
s.Recycle()
}
if len(news) >= size {
o := order()
newiter.Push(MakeBioSequenceBatch(o, news))
news = obiseq.MakeBioSequenceSlice()
}
} // End of the slice loop
sl.Recycle(false)
} // End of the iterator loop
if len(news) > 0 {
newiter.Push(MakeBioSequenceBatch(order(), news))
}
newiter.Done()
}
for i := 1; i < nworkers; i++ {
go f(iterator.Split(), i)
}
go f(iterator, 0)
return newiter
}
return ifrg
}

View File

@ -11,11 +11,10 @@ import (
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
) )
var _Debug = false var _Debug = false
var _ParallelWorkers = runtime.NumCPU()*2 - 1 var _WorkerPerCore = 2
var _MaxAllowedCPU = runtime.NumCPU() var _MaxAllowedCPU = runtime.NumCPU()
var _BatchSize = 5000 var _BatchSize = 5000
var _Pprof = false var _Pprof = false
@ -31,9 +30,9 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser
options.BoolVar(&_Debug, "debug", false) options.BoolVar(&_Debug, "debug", false)
options.BoolVar(&_Pprof, "pprof", false) options.BoolVar(&_Pprof, "pprof", false)
options.IntVar(&_ParallelWorkers, "workers", _ParallelWorkers, // options.IntVar(&_ParallelWorkers, "workers", _ParallelWorkers,
options.Alias("w"), // options.Alias("w"),
options.Description("Number of parallele threads computing the result")) // options.Description("Number of parallele threads computing the result"))
options.IntVar(&_MaxAllowedCPU, "max-cpu", _MaxAllowedCPU, options.IntVar(&_MaxAllowedCPU, "max-cpu", _MaxAllowedCPU,
options.GetEnv("OBIMAXCPU"), options.GetEnv("OBIMAXCPU"),
@ -74,16 +73,13 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser
runtime.GOMAXPROCS(_MaxAllowedCPU) runtime.GOMAXPROCS(_MaxAllowedCPU)
if options.Called("max-cpu") { if options.Called("max-cpu") {
log.Printf("CPU number limited to %d", _MaxAllowedCPU) log.Printf("CPU number limited to %d", _MaxAllowedCPU)
if !options.Called("workers") {
_ParallelWorkers = _MaxAllowedCPU*2 - 1
log.Printf("Number of workers set %d", _ParallelWorkers)
}
} }
if options.Called("no-singleton") { if options.Called("no-singleton") {
log.Printf("No singleton option set") log.Printf("No singleton option set")
} }
log.Printf("Number of workers set %d", CLIParallelWorkers())
return options, remaining return options, remaining
} }
} }
@ -96,7 +92,7 @@ func CLIIsDebugMode() bool {
// CLIParallelWorkers returns the number of parallel workers requested by // CLIParallelWorkers returns the number of parallel workers requested by
// the command line option --workers|-w. // the command line option --workers|-w.
func CLIParallelWorkers() int { func CLIParallelWorkers() int {
return _ParallelWorkers return _MaxAllowedCPU * _WorkerPerCore
} }
// CLIParallelWorkers returns the number of parallel workers requested by // CLIParallelWorkers returns the number of parallel workers requested by
@ -119,3 +115,15 @@ func DebugOn() {
func DebugOff() { func DebugOff() {
_Debug = false _Debug = false
} }
func SetWorkerPerCore(n int) {
_WorkerPerCore = n
}
func WorkerPerCore() int {
return _WorkerPerCore
}
func SetBatchSize(n int) {
_BatchSize = n
}

View File

@ -1,3 +1,5 @@
// It adds to a CLI every options proposed to the user to tune the parametters of the PCR simulation
// algorithm
package obipcr package obipcr
import ( import (
@ -14,13 +16,14 @@ var _ReversePrimer string
var _AllowedMismatch = 0 var _AllowedMismatch = 0
var _MinimumLength = 0 var _MinimumLength = 0
var _MaximumLength = -1 var _MaximumLength = -1
var _Fragmented = false
// PCROptionSet defines every options related to a simulated PCR. // PCROptionSet defines every options related to a simulated PCR.
// //
// The function adds to a CLI every options proposed to the user // The function adds to a CLI every options proposed to the user
// to tune the parametters of the PCR simulation algorithm. // to tune the parametters of the PCR simulation algorithm.
// //
// Parameters // # Parameters
// //
// - option : is a pointer to a getoptions.GetOpt instance normaly // - option : is a pointer to a getoptions.GetOpt instance normaly
// produced by the // produced by the
@ -29,6 +32,9 @@ func PCROptionSet(options *getoptions.GetOpt) {
options.Alias("c"), options.Alias("c"),
options.Description("Considers that sequences are [c]ircular.")) options.Description("Considers that sequences are [c]ircular."))
options.BoolVar(&_Fragmented, "fragmented", false,
options.Description("Fragaments long sequences in overlaping fragments to speedup computations"))
options.StringVar(&_ForwardPrimer, "forward", "", options.StringVar(&_ForwardPrimer, "forward", "",
options.Required("You must provide a forward primer"), options.Required("You must provide a forward primer"),
options.Description("The forward primer used for the electronic PCR.")) options.Description("The forward primer used for the electronic PCR."))
@ -46,6 +52,7 @@ func PCROptionSet(options *getoptions.GetOpt) {
options.Description("Minimum length of the barcode (primers excluded).")) options.Description("Minimum length of the barcode (primers excluded)."))
options.IntVar(&_MaximumLength, "max-length", -1, options.IntVar(&_MaximumLength, "max-length", -1,
options.Alias("L"), options.Alias("L"),
options.Required("You must indicate the maximum size of the amplicon (excluded primer length)"),
options.Description("Maximum length of the barcode (primers excluded).")) options.Description("Maximum length of the barcode (primers excluded)."))
} }
@ -56,11 +63,11 @@ func OptionSet(options *getoptions.GetOpt) {
PCROptionSet(options) PCROptionSet(options)
} }
// ForwardPrimer returns the sequence of the forward primer as indicated by the // CLIForwardPrimer returns the sequence of the forward primer as indicated by the
// --forward command line option // --forward command line option
func ForwardPrimer() string { func CLIForwardPrimer() string {
pattern, err := obiapat.MakeApatPattern(_ForwardPrimer, _AllowedMismatch, false) pattern, err := obiapat.MakeApatPattern(_ForwardPrimer, _AllowedMismatch, false)
if err != nil { if err != nil {
log.Fatalf("%+v", err) log.Fatalf("%+v", err)
} }
@ -70,10 +77,10 @@ func ForwardPrimer() string {
return _ForwardPrimer return _ForwardPrimer
} }
// ReversePrimer returns the sequence of the reverse primer as indicated by the // CLIReversePrimer returns the sequence of the reverse primer as indicated by the
// --reverse command line option // --reverse command line option
func ReversePrimer() string { func CLIReversePrimer() string {
pattern, err := obiapat.MakeApatPattern(_ReversePrimer, _AllowedMismatch,false) pattern, err := obiapat.MakeApatPattern(_ReversePrimer, _AllowedMismatch, false)
if err != nil { if err != nil {
log.Fatalf("%+v", err) log.Fatalf("%+v", err)
@ -84,27 +91,31 @@ func ReversePrimer() string {
return _ReversePrimer return _ReversePrimer
} }
// AllowedMismatch returns the allowed mistmatch count between each // CLIAllowedMismatch returns the allowed mistmatch count between each
// primer and the sequences as indicated by the // primer and the sequences as indicated by the
// --allowed-mismatches|-e command line option // --allowed-mismatches|-e command line option
func AllowedMismatch() int { func CLIAllowedMismatch() int {
return _AllowedMismatch return _AllowedMismatch
} }
// Circular returns the considered sequence topology as indicated by the // CLICircular returns the considered sequence topology as indicated by the
// --circular|-c command line option // --circular|-c command line option
func Circular() bool { func CLICircular() bool {
return _Circular return _Circular
} }
// MinLength returns the amplicon minimum length as indicated by the // CLIMinLength returns the amplicon minimum length as indicated by the
// --min-length|-l command line option // --min-length|-l command line option
func MinLength() int { func CLIMinLength() int {
return _MinimumLength return _MinimumLength
} }
// MaxLength returns the amplicon maximum length as indicated by the // CLIMaxLength returns the amplicon maximum length as indicated by the
// --max-length|-L command line option // --max-length|-L command line option
func MaxLength() int { func CLIMaxLength() int {
return _MaximumLength return _MaximumLength
} }
func CLIFragmented() bool {
return _Fragmented
}

View File

@ -4,39 +4,56 @@ import (
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiapat" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiapat"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils"
log "github.com/sirupsen/logrus"
) )
// PCR iterates over sequences provided by a obiseq.IBioSequenceBatch // CLIPCR iterates over sequences provided by a obiseq.IBioSequenceBatch
// and returns an other obiseq.IBioSequenceBatch distributing // and returns an other obiseq.IBioSequenceBatch distributing
// obiseq.BioSequenceBatch containing the selected amplicon sequences. // obiseq.BioSequenceBatch containing the selected amplicon sequences.
func PCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) { func CLIPCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) {
opts := make([]obiapat.WithOption, 0, 10) opts := make([]obiapat.WithOption, 0, 10)
opts = append(opts, opts = append(opts,
obiapat.OptionForwardPrimer( obiapat.OptionForwardPrimer(
ForwardPrimer(), CLIForwardPrimer(),
AllowedMismatch(), CLIAllowedMismatch(),
), ),
obiapat.OptionReversePrimer( obiapat.OptionReversePrimer(
ReversePrimer(), CLIReversePrimer(),
AllowedMismatch(), CLIAllowedMismatch(),
), ),
) )
if MinLength() > 0 { if CLIMinLength() > 0 {
opts = append(opts, obiapat.OptionMinLength(MinLength())) opts = append(opts, obiapat.OptionMinLength(CLIMinLength()))
} }
if MaxLength() > 0 { opts = append(opts, obiapat.OptionMaxLength(CLIMaxLength()))
opts = append(opts, obiapat.OptionMaxLength(MaxLength()))
}
if Circular() { if CLICircular() {
opts = append(opts, obiapat.OptionCircular(Circular())) opts = append(opts, obiapat.OptionCircular(CLICircular()))
} }
worker := obiapat.PCRSliceWorker(opts...) worker := obiapat.PCRSliceWorker(opts...)
if CLIFragmented() {
frags := obiiter.IFragments(
CLIMaxLength()*1000,
CLIMaxLength()*100,
CLIMaxLength()+obiutils.MaxInt(len(CLIForwardPrimer()),
len(CLIReversePrimer()))+obiutils.MinInt(len(CLIForwardPrimer()),
len(CLIReversePrimer()))/2,
100,
obioptions.CLIParallelWorkers(),
)
log.Infof("Fragmenting sequence longer than %dbp into chuncks of %dbp",
CLIMaxLength()*100000000,
CLIMaxLength()*100,
)
iterator = iterator.Pipe(frags)
}
return iterator.MakeISliceWorker(worker, obioptions.CLIParallelWorkers(), 0), nil return iterator.MakeISliceWorker(worker, obioptions.CLIParallelWorkers(), 0), nil
} }