Update of obipcr and homogenization of logging

Former-commit-id: 46abf47c19ace5248042c02cf1f81d9f6c12eb10
This commit is contained in:
Eric Coissac
2024-05-16 15:18:30 +02:00
parent 61be8a55b1
commit 55ce36f329
27 changed files with 345 additions and 58 deletions

View File

@ -1,7 +1,7 @@
package obiformats
import (
"log"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"

View File

@ -179,7 +179,8 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
newIter.Add(1)
go _ParseEmblFile(opt.Source(), entry_channel, newIter,
opt.WithFeatureTable(),
opt.BatchSize(), opt.TotalSeqSize())
opt.BatchSize(),
opt.TotalSeqSize())
}
go func() {

View File

@ -9,7 +9,6 @@ import (
"slices"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
@ -43,7 +42,11 @@ func _EndOfLastFastaEntry(buffer []byte) int {
func _ParseFastaFile(source string,
input ChannelSeqFileChunk,
out obiiter.IBioSequence) {
out obiiter.IBioSequence,
no_order bool,
batch_size int,
chunck_order func() int,
) {
var identifier string
var definition string
@ -56,7 +59,7 @@ func _ParseFastaFile(source string,
for chunks := range input {
scanner := bufio.NewReader(chunks.raw)
sequences := make(obiseq.BioSequenceSlice, 0, 100)
sequences := make(obiseq.BioSequenceSlice, 0, batch_size)
for C, err := scanner.ReadByte(); err != io.EOF; C, err = scanner.ReadByte() {
is_end_of_line := C == '\r' || C == '\n'
@ -130,6 +133,12 @@ func _ParseFastaFile(source string,
s := obiseq.NewBioSequence(identifier, slices.Clone(seqBytes.Bytes()), definition)
s.SetSource(source)
sequences = append(sequences, s)
if no_order {
if len(sequences) == batch_size {
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
sequences = make(obiseq.BioSequenceSlice, 0, batch_size)
}
}
state = 1
} else if !is_sep {
@ -145,8 +154,11 @@ func _ParseFastaFile(source string,
}
if len(sequences) > 0 {
log.Debugln("Pushing sequences")
out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences))
if no_order {
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
} else {
out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences))
}
}
}
@ -158,14 +170,19 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
opt := MakeOptions(options)
out := obiiter.MakeIBioSequence()
source := opt.Source()
nworker := opt.ParallelWorkers()
nworker := obioptions.CLIReadParallelWorkers()
chkchan := ReadSeqFileChunk(reader, _EndOfLastFastaEntry)
chunck_order := obiutils.AtomicCounter()
for i := 0; i < nworker; i++ {
out.Add(1)
go _ParseFastaFile(source, chkchan, out)
go _ParseFastaFile(opt.Source(),
chkchan,
out,
opt.NoOrder(),
opt.BatchSize(),
chunck_order)
}
go func() {

View File

@ -184,7 +184,11 @@ func lastFastqCut(buffer []byte) ([]byte, []byte) {
func _ParseFastqFile(source string,
input ChannelSeqFileChunk,
out obiiter.IBioSequence,
quality_shift byte) {
quality_shift byte,
no_order bool,
batch_size int,
chunck_order func() int,
) {
var identifier string
var definition string
@ -311,6 +315,14 @@ func _ParseFastqFile(source string,
q[i] = q[i] - quality_shift
}
sequences[len(sequences)-1].SetQualities(q)
if no_order {
if len(sequences) == batch_size {
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
sequences = make(obiseq.BioSequenceSlice, 0, batch_size)
}
}
state = 11
} else {
qualBytes.WriteByte(C)
@ -328,9 +340,13 @@ func _ParseFastqFile(source string,
}
if len(sequences) > 0 {
log.Debugln("Pushing sequences")
out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences))
if no_order {
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
} else {
out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences))
}
}
}
out.Done()
@ -341,15 +357,20 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
opt := MakeOptions(options)
out := obiiter.MakeIBioSequence()
source := opt.Source()
nworker := opt.ParallelWorkers()
chunkorder := obiutils.AtomicCounter()
nworker := obioptions.CLIReadParallelWorkers()
chkchan := ReadSeqFileChunk(reader, _EndOfLastFastqEntry)
for i := 0; i < nworker; i++ {
out.Add(1)
go _ParseFastqFile(source, chkchan, out,
byte(obioptions.InputQualityShift()))
go _ParseFastqFile(opt.Source(),
chkchan,
out,
byte(obioptions.InputQualityShift()),
opt.NoOrder(),
opt.BatchSize(),
chunkorder)
}
go func() {

View File

@ -3,12 +3,13 @@ package obiformats
import (
"bytes"
"fmt"
"log"
"math"
"regexp"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
"github.com/goccy/go-json"

View File

@ -115,7 +115,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
options ...WithOption) (obiiter.IBioSequence, error) {
opt := MakeOptions(options)
iterator = iterator.Rebatch(1000)
iterator = iterator.Rebatch(opt.BatchSize())
file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile())
newIter := obiiter.MakeIBioSequence()

View File

@ -62,9 +62,8 @@ func WriteFastq(iterator obiiter.IBioSequence,
file io.WriteCloser,
options ...WithOption) (obiiter.IBioSequence, error) {
iterator = iterator.Rebatch(1000)
opt := MakeOptions(options)
iterator = iterator.Rebatch(opt.BatchSize())
file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile())

View File

@ -14,6 +14,7 @@ type __options__ struct {
total_seq_size int
full_file_batch bool
parallel_workers int
no_order bool
closefile bool
appendfile bool
compressed bool
@ -48,6 +49,7 @@ func MakeOptions(setters []WithOption) Options {
parallel_workers: obioptions.CLIReadParallelWorkers(),
batch_size: obioptions.CLIBatchSize(),
total_seq_size: 1024 * 1024 * 100, // 100 MB by default
no_order: false,
full_file_batch: false,
closefile: false,
appendfile: false,
@ -101,6 +103,10 @@ func (opt Options) FormatFastSeqHeader() func(*obiseq.BioSequence) string {
return opt.pointer.fastseq_header_writer
}
func (opt Options) NoOrder() bool {
return opt.pointer.no_order
}
func (opt Options) ProgressBar() bool {
return opt.pointer.with_progress_bar
}
@ -205,6 +211,16 @@ func OptionsAppendFile(append bool) WithOption {
return f
}
func OptionNoOrder(no_order bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.no_order = no_order
})
return f
}
func OptionsCompressed(compressed bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.compressed = compressed

View File

@ -0,0 +1,47 @@
package obiiter
import (
"runtime"
"github.com/pbnjay/memory"
log "github.com/sirupsen/logrus"
)
func (iterator IBioSequence) LimitMemory(fraction float64) IBioSequence {
newIter := MakeIBioSequence()
fracLoad := func() float64 {
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
return float64(mem.Alloc) / float64(memory.TotalMemory())
}
newIter.Add(1)
go func() {
for iterator.Next() {
nwait := 0
for fracLoad() > fraction {
runtime.Gosched()
nwait++
if nwait%1000 == 0 {
log.Warnf("Wait for memory limit %f/%f", fracLoad(), fraction)
}
if nwait > 10000 {
log.Warnf("Very long wait for memory limit %f/%f", fracLoad(), fraction)
break
}
}
newIter.Push(iterator.Get())
}
newIter.Done()
}()
go func() {
newIter.WaitAndClose()
}()
return newIter
}

View File

@ -1,7 +1,7 @@
package obilua
import (
"log"
log "github.com/sirupsen/logrus"
lua "github.com/yuin/gopher-lua"
)
@ -67,13 +67,13 @@ func Table2ByteSlice(interpreter *lua.LState, table *lua.LTable) []byte {
v := table.RawGetInt(i)
switch v.Type() {
case lua.LTNumber:
if x:=float64(v.(lua.LNumber)); x <=255 {
if x := float64(v.(lua.LNumber)); x <= 255 {
val[i-1] = byte(x)
} else {
log.Fatalf("LUA: Value %f at index %d is to large to be converted to byte", x,i)
log.Fatalf("LUA: Value %f at index %d is to large to be converted to byte", x, i)
}
default:
log.Fatalf("LUA: Value %v at index %d cannot be converted to byte", v,i)
log.Fatalf("LUA: Value %v at index %d cannot be converted to byte", v, i)
}
}

View File

@ -16,6 +16,8 @@ import (
var _Debug = false
var _WorkerPerCore = 2.0
var _ReadWorkerPerCore = 1.0
var _StrictReadWorker = 0
var _ParallelFilesRead = 0
var _MaxAllowedCPU = runtime.NumCPU()
var _BatchSize = 5000
var _Pprof = false
@ -31,9 +33,16 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser
options.SetMode(getoptions.Bundling)
options.SetUnknownMode(getoptions.Fail)
options.Bool("help", false, options.Alias("h", "?"))
options.Bool("version", false)
options.BoolVar(&_Debug, "debug", false)
options.BoolVar(&_Pprof, "pprof", false)
options.Bool("version", false,
options.Description("Prints the version and exits."))
options.BoolVar(&_Debug, "debug", false,
options.GetEnv("OBIDEBUG"),
options.Description("Enable debug mode, by setting log level to debug."))
options.BoolVar(&_Pprof, "pprof", false,
options.Description("Enable pprof server. Look at the log for details."))
// options.IntVar(&_ParallelWorkers, "workers", _ParallelWorkers,
// options.Alias("w"),
@ -43,6 +52,9 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser
options.GetEnv("OBIMAXCPU"),
options.Description("Number of parallele threads computing the result"))
options.BoolVar(&_Pprof, "force-one-cpu", false,
options.Description("Force to use only one cpu core for parallel processing"))
options.IntVar(&_BatchSize, "batch-size", _BatchSize,
options.GetEnv("OBIBATCHSIZE"),
options.Description("Number of sequence per batch for paralelle processing"))
@ -93,11 +105,20 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser
// Setup the maximum number of CPU usable by the program
if _MaxAllowedCPU == 1 {
log.Warn("Limitating the Maximum number of CPU to 1 is not recommanded")
runtime.GOMAXPROCS(1)
} else {
runtime.GOMAXPROCS(_MaxAllowedCPU)
log.Warn("The number of CPU requested has been set to 2")
SetMaxCPU(2)
}
if options.Called("max-cpu") {
if options.Called("force-one-cpu") {
log.Warn("Limitating the Maximum number of CPU to 1 is not recommanded")
log.Warn("The number of CPU has been forced to 1")
log.Warn("This can lead to unexpected behavior")
SetMaxCPU(1)
}
runtime.GOMAXPROCS(_MaxAllowedCPU)
if options.Called("max-cpu") || options.Called("force-one-cpu") {
log.Printf("CPU number limited to %d", _MaxAllowedCPU)
}
@ -119,74 +140,200 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser
}
}
// Predicate indicating if the debug mode is activated.
// CLIIsDebugMode returns whether the CLI is in debug mode.
//
// The debug mode is activated by the command line option --debug or
// the environment variable OBIDEBUG.
// It can be activated programmatically by the SetDebugOn() function.
//
// No parameters.
// Returns a boolean indicating if the CLI is in debug mode.
func CLIIsDebugMode() bool {
return _Debug
}
// CLIParallelWorkers returns the number of parallel workers requested by
// the command line option --workers|-w.
// CLIParallelWorkers returns the number of parallel workers used for
// computing the result.
//
// The number of parallel workers is determined by the command line option
// --max-cpu|-m and the environment variable OBIMAXCPU. This number is
// multiplied by the variable _WorkerPerCore.
//
// No parameters.
// Returns an integer representing the number of parallel workers.
func CLIParallelWorkers() int {
return int(float64(_MaxAllowedCPU) * float64(_WorkerPerCore))
return int(float64(CLIMaxCPU()) * float64(WorkerPerCore()))
}
// CLIReadParallelWorkers returns the number of parallel workers used for
// reading files.
//
// The number of parallel workers is determined by the command line option
// --max-cpu|-m and the environment variable OBIMAXCPU. This number is
// multiplied by the variable _ReadWorkerPerCore.
//
// No parameters.
// Returns an integer representing the number of parallel workers.
func CLIReadParallelWorkers() int {
return int(float64(_MaxAllowedCPU) * float64(_ReadWorkerPerCore))
if StrictReadWorker() == 0 {
return int(float64(CLIMaxCPU()) * ReadWorkerPerCore())
} else {
return StrictReadWorker()
}
}
// CLIParallelWorkers returns the number of parallel workers requested by
// the command line option --workers|-w.
// CLIMaxCPU returns the maximum number of CPU cores allowed.
//
// The maximum number of CPU cores is determined by the command line option
// --max-cpu|-m and the environment variable OBIMAXCPU.
//
// No parameters.
// Returns an integer representing the maximum number of CPU cores allowed.
func CLIMaxCPU() int {
return _MaxAllowedCPU
}
// CLIBatchSize returns the expeted size of the sequence batches
// CLIBatchSize returns the expected size of the sequence batches.
//
// In Obitools, the sequences are processed in parallel by batches.
// The number of sequence in each batch is determined by the command line option
// --batch-size and the environment variable OBIBATCHSIZE.
//
// No parameters.
// Returns an integer value.
func CLIBatchSize() int {
return _BatchSize
}
// DebugOn sets the debug mode on.
func DebugOn() {
// SetDebugOn sets the debug mode on.
func SetDebugOn() {
_Debug = true
}
// DebugOff sets the debug mode off.
func DebugOff() {
// SetDebugOff sets the debug mode off.
func SetDebugOff() {
_Debug = false
}
// SetWorkerPerCore sets the number of workers per CPU core.
//
// It takes a float64 parameter representing the number of workers
// per CPU core and does not return any value.
func SetWorkerPerCore(n float64) {
_WorkerPerCore = n
}
// SetReadWorkerPerCore sets the number of worker per CPU
// core for reading files.
//
// n float64
func SetReadWorkerPerCore(n float64) {
_ReadWorkerPerCore = n
}
// WorkerPerCore returns the number of workers per CPU core.
//
// No parameters.
// Returns a float64 representing the number of workers per CPU core.
func WorkerPerCore() float64 {
return _WorkerPerCore
}
// ReadWorkerPerCore returns the number of worker per CPU core for
// computing the result.
//
// No parameters.
// Returns a float64 representing the number of worker per CPU core.
func ReadWorkerPerCore() float64 {
return _ReadWorkerPerCore
}
// SetBatchSize sets the size of the sequence batches.
//
// n - an integer representing the size of the sequence batches.
func SetBatchSize(n int) {
_BatchSize = n
}
// InputQualityShift returns the quality shift value for input.
//
// It can be set programmatically by the SetInputQualityShift() function.
// This value is used to decode the quality scores in FASTQ files.
// The quality shift value defaults to 33, which is the correct value for
// Sanger formated FASTQ files.
// The quality shift value can be modified to 64 by the command line option
// --solexa, for decoding old Solexa formated FASTQ files.
//
// No parameters.
// Returns an integer representing the quality shift value for input.
func InputQualityShift() int {
return _Quality_Shift_Input
}
// OutputQualityShift returns the quality shift value used for FASTQ output.
//
// No parameters.
// Returns an integer representing the quality shift value for output.
func OutputQualityShift() int {
return _Quality_Shift_Output
}
// SetInputQualityShift sets the quality shift value for decoding FASTQ.
//
// n - an integer representing the quality shift value to be set.
func SetInputQualityShift(n int) {
_Quality_Shift_Input = n
}
// SetOutputQualityShift sets the quality shift value used for FASTQ output.
//
// n - an integer representing the quality shift value to be set.
func SetOutputQualityShift(n int) {
_Quality_Shift_Output = n
}
// SetMaxCPU sets the maximum number of CPU cores allowed.
//
// n - an integer representing the new maximum number of CPU cores.
func SetMaxCPU(n int) {
_MaxAllowedCPU = n
}
// SetReadWorker sets the number of workers for reading files.
//
// The number of worker dedicated to reading files is determined
// as the number of allowed CPU cores multiplied by number of read workers per core.
// Setting the number of read workers using this function allows to decouple the number
// of read workers from the number of CPU cores.
//
// n - an integer representing the number of workers to be set.
func SetStrictReadWorker(n int) {
_StrictReadWorker = n
}
// ReadWorker returns the number of workers for reading files.
//
// No parameters.
// Returns an integer representing the number of workers.
func StrictReadWorker() int {
return _StrictReadWorker
}
// ParallelFilesRead returns the number of files to be read in parallel.
//
// No parameters.
// Returns an integer representing the number of files to be read.
func ParallelFilesRead() int {
if _ParallelFilesRead == 0 {
return CLIParallelWorkers()
} else {
return _ParallelFilesRead
}
}
// SetParallelFilesRead sets the number of files to be read in parallel.
//
// n - an integer representing the number of files to be set.
func SetParallelFilesRead(n int) {
_ParallelFilesRead = n
}

View File

@ -2,10 +2,11 @@ package obiseq
import (
"fmt"
"log"
"reflect"
"strings"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
"github.com/PaesslerAG/gval"
)
@ -38,8 +39,6 @@ import (
// return float64(m)
// }
// return float64(m)
// }

View File

@ -1,9 +1,10 @@
package obiseq
import (
"log"
"sync"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
)

View File

@ -136,7 +136,7 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) {
nreader := 1
if CLINoInputOrder() {
nreader = obioptions.CLIParallelWorkers()
nreader = obioptions.ParallelFilesRead()
}
iterator = obiformats.ReadSequencesBatchFromFiles(

View File

@ -1,7 +1,7 @@
package obicsv
import (
"log"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"

View File

@ -1,7 +1,7 @@
package obigrep
import (
"log"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"

View File

@ -60,5 +60,5 @@ func CLIPCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) {
iterator = iterator.Pipe(frags)
}
return iterator.MakeISliceWorker(worker, false, obioptions.CLIParallelWorkers()), nil
return iterator.LimitMemory(0.5).MakeISliceWorker(worker, false, obioptions.CLIParallelWorkers()), nil
}

View File

@ -2,7 +2,7 @@ package obirefidx
import (
"fmt"
"log"
log "github.com/sirupsen/logrus"
"sort"
"sync"

View File

@ -221,5 +221,5 @@ func IndexReferenceDB(iterator obiiter.IBioSequence) obiiter.IBioSequence {
go f()
}
return indexed.Rebatch(1000)
return indexed.Rebatch(obioptions.CLIBatchSize())
}

View File

@ -1,7 +1,8 @@
package obiscript
import (
"log"
log "github.com/sirupsen/logrus"
"os"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"

View File

@ -3,7 +3,7 @@ package obisplit
import (
"encoding/csv"
"fmt"
"log"
log "github.com/sirupsen/logrus"
"os"
"slices"

View File

@ -1,7 +1,7 @@
package obitag
import (
"log"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"

View File

@ -2,8 +2,9 @@ package obiutils
import (
"fmt"
"log"
"reflect"
log "github.com/sirupsen/logrus"
)
// InterfaceToString converts an interface value to a string.