diff --git a/Release-notes.md b/Release-notes.md index 0cd0a64..494b596 100644 --- a/Release-notes.md +++ b/Release-notes.md @@ -2,6 +2,20 @@ ## Latest changes +### CPU limitation + +- By default, *OBITools4* tries to use all the computing power available on your computer. + In some circumstances this can be problematic (e.g. if you are running on a computer + cluster managed by your university). You can limit the number of CPU cores used by + *OBITools4* or by using the **--max-cpu** option or by setting the **OBIMAXCPU** environment + variable. + Some strange behaviour of *OBITools4* has been observed when users try to limit the maximum + number of usable CPU cores to one. This seems to be caused by the Go language, and it is not + obvious to get *OBITools4* to run correctly on a single core in all circumstances. Therefore, + if you ask to use a single core, **OBITools4** will print a warning message and actually set + this parameter to two cores. If you really want a single core, you can use the + **--force-one-core** option. But be aware that this can lead to incorrect calculations. + ### New features - A new option **--version** has been added to every obitools command. @@ -9,9 +23,26 @@ - In `obiscript` a `qualities` method has been added to retrieve or set the quality scores from a BioSequence object. +### Enhancement + +- In every *OBITools* command, the progress bar are automatically deactivated when the + standard error output is redirected. +- Because Genbank and ENA:EMBL contain very large sequences, while OBITools4 are optimized + As Genbank and ENA:EMBL contain very large sequences, while OBITools4 is optimised for + short sequences, `obipcr` faces some problems with excessive consumption of computer + resources, especially memory. Several improvements in the tuning of the default `obipcr` + parameters and some new features, currently only available for FASTA and FASTQ file readers, + have been implemented to limit the memory impact of `obipcr` without changing the + computational efficiency too much. +- Logging system and therefore format, have been homogenized. + ### Bug - In `obitag`, correct the wrong assignation of the **obitag_bestmatch** attribute. +- In `obiclean`, the **--no-progress-bar** option desactivate all the progress bars, + not only the data are read. +- Several corrections in reading of FASTA and FASTQ files including some code + simplifications and factorisation. ## April 2nd, 2024. Release 4.2.0 diff --git a/cmd/obitools/obipcr/main.go b/cmd/obitools/obipcr/main.go index d70ad37..8d255da 100644 --- a/cmd/obitools/obipcr/main.go +++ b/cmd/obitools/obipcr/main.go @@ -27,6 +27,8 @@ func main() { obioptions.SetWorkerPerCore(2) obioptions.SetReadWorkerPerCore(0.5) + obioptions.SetParallelFilesRead(obioptions.CLIParallelWorkers() / 4) + obioptions.SetBatchSize(10) optionParser := obioptions.GenerateOptionParser(obipcr.OptionSet) diff --git a/go.mod b/go.mod index 9007076..1f3a28c 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/kr/text v0.2.0 // indirect + github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.6.1 // indirect ) diff --git a/go.sum b/go.sum index 7fc23b9..aeef478 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= diff --git a/pkg/obiformats/batch_of_files_reader.go b/pkg/obiformats/batch_of_files_reader.go index 2b6d4ea..ff464fb 100644 --- a/pkg/obiformats/batch_of_files_reader.go +++ b/pkg/obiformats/batch_of_files_reader.go @@ -1,7 +1,7 @@ package obiformats import ( - "log" + log "github.com/sirupsen/logrus" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index d2e16bd..79da6b0 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -179,7 +179,8 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence { newIter.Add(1) go _ParseEmblFile(opt.Source(), entry_channel, newIter, opt.WithFeatureTable(), - opt.BatchSize(), opt.TotalSeqSize()) + opt.BatchSize(), + opt.TotalSeqSize()) } go func() { diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index 028cefd..65d6940 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -9,7 +9,6 @@ import ( "slices" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" - "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" @@ -43,7 +42,11 @@ func _EndOfLastFastaEntry(buffer []byte) int { func _ParseFastaFile(source string, input ChannelSeqFileChunk, - out obiiter.IBioSequence) { + out obiiter.IBioSequence, + no_order bool, + batch_size int, + chunck_order func() int, +) { var identifier string var definition string @@ -56,7 +59,7 @@ func _ParseFastaFile(source string, for chunks := range input { scanner := bufio.NewReader(chunks.raw) - sequences := make(obiseq.BioSequenceSlice, 0, 100) + sequences := make(obiseq.BioSequenceSlice, 0, batch_size) for C, err := scanner.ReadByte(); err != io.EOF; C, err = scanner.ReadByte() { is_end_of_line := C == '\r' || C == '\n' @@ -130,6 +133,12 @@ func _ParseFastaFile(source string, s := obiseq.NewBioSequence(identifier, slices.Clone(seqBytes.Bytes()), definition) s.SetSource(source) sequences = append(sequences, s) + if no_order { + if len(sequences) == batch_size { + out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) + sequences = make(obiseq.BioSequenceSlice, 0, batch_size) + } + } state = 1 } else if !is_sep { @@ -145,8 +154,11 @@ func _ParseFastaFile(source string, } if len(sequences) > 0 { - log.Debugln("Pushing sequences") - out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences)) + if no_order { + out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) + } else { + out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences)) + } } } @@ -158,14 +170,19 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e opt := MakeOptions(options) out := obiiter.MakeIBioSequence() - source := opt.Source() + nworker := opt.ParallelWorkers() - nworker := obioptions.CLIReadParallelWorkers() chkchan := ReadSeqFileChunk(reader, _EndOfLastFastaEntry) + chunck_order := obiutils.AtomicCounter() for i := 0; i < nworker; i++ { out.Add(1) - go _ParseFastaFile(source, chkchan, out) + go _ParseFastaFile(opt.Source(), + chkchan, + out, + opt.NoOrder(), + opt.BatchSize(), + chunck_order) } go func() { diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index 1bb2f24..6437ad8 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -184,7 +184,11 @@ func lastFastqCut(buffer []byte) ([]byte, []byte) { func _ParseFastqFile(source string, input ChannelSeqFileChunk, out obiiter.IBioSequence, - quality_shift byte) { + quality_shift byte, + no_order bool, + batch_size int, + chunck_order func() int, +) { var identifier string var definition string @@ -311,6 +315,14 @@ func _ParseFastqFile(source string, q[i] = q[i] - quality_shift } sequences[len(sequences)-1].SetQualities(q) + + if no_order { + if len(sequences) == batch_size { + out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) + sequences = make(obiseq.BioSequenceSlice, 0, batch_size) + } + } + state = 11 } else { qualBytes.WriteByte(C) @@ -328,9 +340,13 @@ func _ParseFastqFile(source string, } if len(sequences) > 0 { - log.Debugln("Pushing sequences") - out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences)) + if no_order { + out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) + } else { + out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences)) + } } + } out.Done() @@ -341,15 +357,20 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e opt := MakeOptions(options) out := obiiter.MakeIBioSequence() - source := opt.Source() + nworker := opt.ParallelWorkers() + chunkorder := obiutils.AtomicCounter() - nworker := obioptions.CLIReadParallelWorkers() chkchan := ReadSeqFileChunk(reader, _EndOfLastFastqEntry) for i := 0; i < nworker; i++ { out.Add(1) - go _ParseFastqFile(source, chkchan, out, - byte(obioptions.InputQualityShift())) + go _ParseFastqFile(opt.Source(), + chkchan, + out, + byte(obioptions.InputQualityShift()), + opt.NoOrder(), + opt.BatchSize(), + chunkorder) } go func() { diff --git a/pkg/obiformats/fastseq_obi_header.go b/pkg/obiformats/fastseq_obi_header.go index 175e522..6dd4a1c 100644 --- a/pkg/obiformats/fastseq_obi_header.go +++ b/pkg/obiformats/fastseq_obi_header.go @@ -3,12 +3,13 @@ package obiformats import ( "bytes" "fmt" - "log" "math" "regexp" "strconv" "strings" + log "github.com/sirupsen/logrus" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" "github.com/goccy/go-json" diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index 9856431..8d7735a 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -115,7 +115,7 @@ func WriteFasta(iterator obiiter.IBioSequence, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) - iterator = iterator.Rebatch(1000) + iterator = iterator.Rebatch(opt.BatchSize()) file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile()) newIter := obiiter.MakeIBioSequence() diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 54741ca..b99474d 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -62,9 +62,8 @@ func WriteFastq(iterator obiiter.IBioSequence, file io.WriteCloser, options ...WithOption) (obiiter.IBioSequence, error) { - iterator = iterator.Rebatch(1000) - opt := MakeOptions(options) + iterator = iterator.Rebatch(opt.BatchSize()) file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile()) diff --git a/pkg/obiformats/options.go b/pkg/obiformats/options.go index 030037f..9dca35d 100644 --- a/pkg/obiformats/options.go +++ b/pkg/obiformats/options.go @@ -14,6 +14,7 @@ type __options__ struct { total_seq_size int full_file_batch bool parallel_workers int + no_order bool closefile bool appendfile bool compressed bool @@ -48,6 +49,7 @@ func MakeOptions(setters []WithOption) Options { parallel_workers: obioptions.CLIReadParallelWorkers(), batch_size: obioptions.CLIBatchSize(), total_seq_size: 1024 * 1024 * 100, // 100 MB by default + no_order: false, full_file_batch: false, closefile: false, appendfile: false, @@ -101,6 +103,10 @@ func (opt Options) FormatFastSeqHeader() func(*obiseq.BioSequence) string { return opt.pointer.fastseq_header_writer } +func (opt Options) NoOrder() bool { + return opt.pointer.no_order +} + func (opt Options) ProgressBar() bool { return opt.pointer.with_progress_bar } @@ -205,6 +211,16 @@ func OptionsAppendFile(append bool) WithOption { return f } +func OptionNoOrder(no_order bool) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.no_order = no_order + }) + + return f +} + + + func OptionsCompressed(compressed bool) WithOption { f := WithOption(func(opt Options) { opt.pointer.compressed = compressed diff --git a/pkg/obiiter/limitmemory.go b/pkg/obiiter/limitmemory.go new file mode 100644 index 0000000..6a10212 --- /dev/null +++ b/pkg/obiiter/limitmemory.go @@ -0,0 +1,47 @@ +package obiiter + +import ( + "runtime" + + "github.com/pbnjay/memory" + log "github.com/sirupsen/logrus" +) + +func (iterator IBioSequence) LimitMemory(fraction float64) IBioSequence { + newIter := MakeIBioSequence() + + fracLoad := func() float64 { + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + return float64(mem.Alloc) / float64(memory.TotalMemory()) + } + + newIter.Add(1) + go func() { + + for iterator.Next() { + nwait := 0 + for fracLoad() > fraction { + runtime.Gosched() + nwait++ + if nwait%1000 == 0 { + log.Warnf("Wait for memory limit %f/%f", fracLoad(), fraction) + + } + if nwait > 10000 { + log.Warnf("Very long wait for memory limit %f/%f", fracLoad(), fraction) + break + } + } + newIter.Push(iterator.Get()) + } + + newIter.Done() + }() + + go func() { + newIter.WaitAndClose() + }() + + return newIter +} diff --git a/pkg/obilua/lua_table.go b/pkg/obilua/lua_table.go index 61279c5..2b89c60 100644 --- a/pkg/obilua/lua_table.go +++ b/pkg/obilua/lua_table.go @@ -1,7 +1,7 @@ package obilua import ( - "log" + log "github.com/sirupsen/logrus" lua "github.com/yuin/gopher-lua" ) @@ -67,13 +67,13 @@ func Table2ByteSlice(interpreter *lua.LState, table *lua.LTable) []byte { v := table.RawGetInt(i) switch v.Type() { case lua.LTNumber: - if x:=float64(v.(lua.LNumber)); x <=255 { + if x := float64(v.(lua.LNumber)); x <= 255 { val[i-1] = byte(x) } else { - log.Fatalf("LUA: Value %f at index %d is to large to be converted to byte", x,i) + log.Fatalf("LUA: Value %f at index %d is to large to be converted to byte", x, i) } default: - log.Fatalf("LUA: Value %v at index %d cannot be converted to byte", v,i) + log.Fatalf("LUA: Value %v at index %d cannot be converted to byte", v, i) } } diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index f9bf573..9d7e3c2 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -16,6 +16,8 @@ import ( var _Debug = false var _WorkerPerCore = 2.0 var _ReadWorkerPerCore = 1.0 +var _StrictReadWorker = 0 +var _ParallelFilesRead = 0 var _MaxAllowedCPU = runtime.NumCPU() var _BatchSize = 5000 var _Pprof = false @@ -31,9 +33,16 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser options.SetMode(getoptions.Bundling) options.SetUnknownMode(getoptions.Fail) options.Bool("help", false, options.Alias("h", "?")) - options.Bool("version", false) - options.BoolVar(&_Debug, "debug", false) - options.BoolVar(&_Pprof, "pprof", false) + + options.Bool("version", false, + options.Description("Prints the version and exits.")) + + options.BoolVar(&_Debug, "debug", false, + options.GetEnv("OBIDEBUG"), + options.Description("Enable debug mode, by setting log level to debug.")) + + options.BoolVar(&_Pprof, "pprof", false, + options.Description("Enable pprof server. Look at the log for details.")) // options.IntVar(&_ParallelWorkers, "workers", _ParallelWorkers, // options.Alias("w"), @@ -43,6 +52,9 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser options.GetEnv("OBIMAXCPU"), options.Description("Number of parallele threads computing the result")) + options.BoolVar(&_Pprof, "force-one-cpu", false, + options.Description("Force to use only one cpu core for parallel processing")) + options.IntVar(&_BatchSize, "batch-size", _BatchSize, options.GetEnv("OBIBATCHSIZE"), options.Description("Number of sequence per batch for paralelle processing")) @@ -93,11 +105,20 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser // Setup the maximum number of CPU usable by the program if _MaxAllowedCPU == 1 { log.Warn("Limitating the Maximum number of CPU to 1 is not recommanded") - runtime.GOMAXPROCS(1) - } else { - runtime.GOMAXPROCS(_MaxAllowedCPU) + log.Warn("The number of CPU requested has been set to 2") + SetMaxCPU(2) } - if options.Called("max-cpu") { + + if options.Called("force-one-cpu") { + log.Warn("Limitating the Maximum number of CPU to 1 is not recommanded") + log.Warn("The number of CPU has been forced to 1") + log.Warn("This can lead to unexpected behavior") + SetMaxCPU(1) + } + + runtime.GOMAXPROCS(_MaxAllowedCPU) + + if options.Called("max-cpu") || options.Called("force-one-cpu") { log.Printf("CPU number limited to %d", _MaxAllowedCPU) } @@ -119,74 +140,200 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser } } -// Predicate indicating if the debug mode is activated. +// CLIIsDebugMode returns whether the CLI is in debug mode. +// +// The debug mode is activated by the command line option --debug or +// the environment variable OBIDEBUG. +// It can be activated programmatically by the SetDebugOn() function. +// +// No parameters. +// Returns a boolean indicating if the CLI is in debug mode. func CLIIsDebugMode() bool { return _Debug } -// CLIParallelWorkers returns the number of parallel workers requested by -// the command line option --workers|-w. +// CLIParallelWorkers returns the number of parallel workers used for +// computing the result. +// +// The number of parallel workers is determined by the command line option +// --max-cpu|-m and the environment variable OBIMAXCPU. This number is +// multiplied by the variable _WorkerPerCore. +// +// No parameters. +// Returns an integer representing the number of parallel workers. func CLIParallelWorkers() int { - return int(float64(_MaxAllowedCPU) * float64(_WorkerPerCore)) + return int(float64(CLIMaxCPU()) * float64(WorkerPerCore())) } +// CLIReadParallelWorkers returns the number of parallel workers used for +// reading files. +// +// The number of parallel workers is determined by the command line option +// --max-cpu|-m and the environment variable OBIMAXCPU. This number is +// multiplied by the variable _ReadWorkerPerCore. +// +// No parameters. +// Returns an integer representing the number of parallel workers. func CLIReadParallelWorkers() int { - return int(float64(_MaxAllowedCPU) * float64(_ReadWorkerPerCore)) + if StrictReadWorker() == 0 { + return int(float64(CLIMaxCPU()) * ReadWorkerPerCore()) + } else { + return StrictReadWorker() + } } -// CLIParallelWorkers returns the number of parallel workers requested by -// the command line option --workers|-w. +// CLIMaxCPU returns the maximum number of CPU cores allowed. +// +// The maximum number of CPU cores is determined by the command line option +// --max-cpu|-m and the environment variable OBIMAXCPU. +// +// No parameters. +// Returns an integer representing the maximum number of CPU cores allowed. func CLIMaxCPU() int { return _MaxAllowedCPU } -// CLIBatchSize returns the expeted size of the sequence batches +// CLIBatchSize returns the expected size of the sequence batches. +// +// In Obitools, the sequences are processed in parallel by batches. +// The number of sequence in each batch is determined by the command line option +// --batch-size and the environment variable OBIBATCHSIZE. +// +// No parameters. +// Returns an integer value. func CLIBatchSize() int { return _BatchSize } -// DebugOn sets the debug mode on. -func DebugOn() { +// SetDebugOn sets the debug mode on. +func SetDebugOn() { _Debug = true } -// DebugOff sets the debug mode off. -func DebugOff() { +// SetDebugOff sets the debug mode off. +func SetDebugOff() { _Debug = false } +// SetWorkerPerCore sets the number of workers per CPU core. +// +// It takes a float64 parameter representing the number of workers +// per CPU core and does not return any value. func SetWorkerPerCore(n float64) { _WorkerPerCore = n } +// SetReadWorkerPerCore sets the number of worker per CPU +// core for reading files. +// +// n float64 func SetReadWorkerPerCore(n float64) { _ReadWorkerPerCore = n } +// WorkerPerCore returns the number of workers per CPU core. +// +// No parameters. +// Returns a float64 representing the number of workers per CPU core. func WorkerPerCore() float64 { return _WorkerPerCore } +// ReadWorkerPerCore returns the number of worker per CPU core for +// computing the result. +// +// No parameters. +// Returns a float64 representing the number of worker per CPU core. func ReadWorkerPerCore() float64 { return _ReadWorkerPerCore } +// SetBatchSize sets the size of the sequence batches. +// +// n - an integer representing the size of the sequence batches. func SetBatchSize(n int) { _BatchSize = n } +// InputQualityShift returns the quality shift value for input. +// +// It can be set programmatically by the SetInputQualityShift() function. +// This value is used to decode the quality scores in FASTQ files. +// The quality shift value defaults to 33, which is the correct value for +// Sanger formated FASTQ files. +// The quality shift value can be modified to 64 by the command line option +// --solexa, for decoding old Solexa formated FASTQ files. +// +// No parameters. +// Returns an integer representing the quality shift value for input. func InputQualityShift() int { return _Quality_Shift_Input } +// OutputQualityShift returns the quality shift value used for FASTQ output. +// +// No parameters. +// Returns an integer representing the quality shift value for output. func OutputQualityShift() int { return _Quality_Shift_Output } +// SetInputQualityShift sets the quality shift value for decoding FASTQ. +// +// n - an integer representing the quality shift value to be set. func SetInputQualityShift(n int) { _Quality_Shift_Input = n } +// SetOutputQualityShift sets the quality shift value used for FASTQ output. +// +// n - an integer representing the quality shift value to be set. func SetOutputQualityShift(n int) { _Quality_Shift_Output = n } + +// SetMaxCPU sets the maximum number of CPU cores allowed. +// +// n - an integer representing the new maximum number of CPU cores. +func SetMaxCPU(n int) { + _MaxAllowedCPU = n +} + +// SetReadWorker sets the number of workers for reading files. +// +// The number of worker dedicated to reading files is determined +// as the number of allowed CPU cores multiplied by number of read workers per core. +// Setting the number of read workers using this function allows to decouple the number +// of read workers from the number of CPU cores. +// +// n - an integer representing the number of workers to be set. +func SetStrictReadWorker(n int) { + _StrictReadWorker = n +} + +// ReadWorker returns the number of workers for reading files. +// +// No parameters. +// Returns an integer representing the number of workers. +func StrictReadWorker() int { + return _StrictReadWorker +} + +// ParallelFilesRead returns the number of files to be read in parallel. +// +// No parameters. +// Returns an integer representing the number of files to be read. +func ParallelFilesRead() int { + if _ParallelFilesRead == 0 { + return CLIParallelWorkers() + } else { + return _ParallelFilesRead + } +} + +// SetParallelFilesRead sets the number of files to be read in parallel. +// +// n - an integer representing the number of files to be set. +func SetParallelFilesRead(n int) { + _ParallelFilesRead = n +} diff --git a/pkg/obiseq/language.go b/pkg/obiseq/language.go index 4eeaf2a..3200d48 100644 --- a/pkg/obiseq/language.go +++ b/pkg/obiseq/language.go @@ -2,10 +2,11 @@ package obiseq import ( "fmt" - "log" "reflect" "strings" + log "github.com/sirupsen/logrus" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" "github.com/PaesslerAG/gval" ) @@ -38,8 +39,6 @@ import ( // return float64(m) // } - - // return float64(m) // } diff --git a/pkg/obiseq/pool.go b/pkg/obiseq/pool.go index 204518b..c31c6f3 100644 --- a/pkg/obiseq/pool.go +++ b/pkg/obiseq/pool.go @@ -1,9 +1,10 @@ package obiseq import ( - "log" "sync" + log "github.com/sirupsen/logrus" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index 99ddff7..2d91cf3 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -136,7 +136,7 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { nreader := 1 if CLINoInputOrder() { - nreader = obioptions.CLIParallelWorkers() + nreader = obioptions.ParallelFilesRead() } iterator = obiformats.ReadSequencesBatchFromFiles( diff --git a/pkg/obitools/obicsv/obicsv.go b/pkg/obitools/obicsv/obicsv.go index 980b9b6..2014617 100644 --- a/pkg/obitools/obicsv/obicsv.go +++ b/pkg/obitools/obicsv/obicsv.go @@ -1,7 +1,7 @@ package obicsv import ( - "log" + log "github.com/sirupsen/logrus" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" diff --git a/pkg/obitools/obigrep/grep.go b/pkg/obitools/obigrep/grep.go index fdaa188..453d40e 100644 --- a/pkg/obitools/obigrep/grep.go +++ b/pkg/obitools/obigrep/grep.go @@ -1,7 +1,7 @@ package obigrep import ( - "log" + log "github.com/sirupsen/logrus" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" diff --git a/pkg/obitools/obipcr/pcr.go b/pkg/obitools/obipcr/pcr.go index e9f03d9..3fbf18d 100644 --- a/pkg/obitools/obipcr/pcr.go +++ b/pkg/obitools/obipcr/pcr.go @@ -60,5 +60,5 @@ func CLIPCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) { iterator = iterator.Pipe(frags) } - return iterator.MakeISliceWorker(worker, false, obioptions.CLIParallelWorkers()), nil + return iterator.LimitMemory(0.5).MakeISliceWorker(worker, false, obioptions.CLIParallelWorkers()), nil } diff --git a/pkg/obitools/obirefidx/geomindexing.go b/pkg/obitools/obirefidx/geomindexing.go index 2a4157c..c80d1c7 100644 --- a/pkg/obitools/obirefidx/geomindexing.go +++ b/pkg/obitools/obirefidx/geomindexing.go @@ -2,7 +2,7 @@ package obirefidx import ( "fmt" - "log" + log "github.com/sirupsen/logrus" "sort" "sync" diff --git a/pkg/obitools/obirefidx/obirefidx.go b/pkg/obitools/obirefidx/obirefidx.go index 39d1c15..6509028 100644 --- a/pkg/obitools/obirefidx/obirefidx.go +++ b/pkg/obitools/obirefidx/obirefidx.go @@ -221,5 +221,5 @@ func IndexReferenceDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { go f() } - return indexed.Rebatch(1000) + return indexed.Rebatch(obioptions.CLIBatchSize()) } diff --git a/pkg/obitools/obiscript/options.go b/pkg/obitools/obiscript/options.go index 338b59b..3526bb6 100644 --- a/pkg/obitools/obiscript/options.go +++ b/pkg/obitools/obiscript/options.go @@ -1,7 +1,8 @@ package obiscript import ( - "log" + log "github.com/sirupsen/logrus" + "os" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert" diff --git a/pkg/obitools/obisplit/options.go b/pkg/obitools/obisplit/options.go index 5e94961..5b56774 100644 --- a/pkg/obitools/obisplit/options.go +++ b/pkg/obitools/obisplit/options.go @@ -3,7 +3,7 @@ package obisplit import ( "encoding/csv" "fmt" - "log" + log "github.com/sirupsen/logrus" "os" "slices" diff --git a/pkg/obitools/obitag/options.go b/pkg/obitools/obitag/options.go index 0e074bc..f9dbb2a 100644 --- a/pkg/obitools/obitag/options.go +++ b/pkg/obitools/obitag/options.go @@ -1,7 +1,7 @@ package obitag import ( - "log" + log "github.com/sirupsen/logrus" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" diff --git a/pkg/obiutils/cast_interface.go b/pkg/obiutils/cast_interface.go index 98319cf..3835657 100644 --- a/pkg/obiutils/cast_interface.go +++ b/pkg/obiutils/cast_interface.go @@ -2,8 +2,9 @@ package obiutils import ( "fmt" - "log" "reflect" + + log "github.com/sirupsen/logrus" ) // InterfaceToString converts an interface value to a string.