Simplify the workers code by removing dupplicates

Former-commit-id: 638fcf8d88dd93755d1ec89c8fe92f6ed3f733df
This commit is contained in:
Eric Coissac
2024-04-30 12:22:22 +02:00
parent f37477b26c
commit d30d736e48
4 changed files with 30 additions and 79 deletions

View File

@ -1,6 +1,8 @@
package obiiter
import (
"runtime"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
@ -24,42 +26,8 @@ func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker,
nworkers = sizes[0]
}
newIter := MakeIBioSequence()
newIter.Add(nworkers)
go func() {
newIter.WaitAndClose()
log.Debugln("End of the batch workers")
}()
sw := obiseq.SeqToSliceWorker(worker, breakOnError)
f := func(iterator IBioSequence) {
var err error
for iterator.Next() {
batch := iterator.Get()
batch.slice, err = sw(batch.slice)
if err != nil && breakOnError {
log.Fatalf("Error on sequence processing : %v", err)
}
newIter.Push(batch)
}
newIter.Done()
}
log.Debugln("Start of the batch workers")
for i := 0; i < nworkers-1; i++ {
go f(iterator.Split())
}
go f(iterator)
if iterator.IsPaired() {
newIter.MarkAsPaired()
}
return newIter
return iterator.MakeISliceWorker(sw, breakOnError, nworkers)
}
// MakeIConditionalWorker applies a given worker function to each sequence in the iterator that satisfies the given predicate.
@ -80,42 +48,10 @@ func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePre
nworkers = sizes[0]
}
newIter := MakeIBioSequence()
newIter.Add(nworkers)
go func() {
newIter.WaitAndClose()
log.Debugln("End of the batch workers")
}()
sw := obiseq.SeqToSliceConditionalWorker(predicate, worker, breakOnError)
f := func(iterator IBioSequence) {
var err error
for iterator.Next() {
batch := iterator.Get()
batch.slice, err = sw(batch.slice)
if err != nil && breakOnError {
log.Fatalf("Error on sequence processing : %v", err)
}
newIter.Push(batch)
}
newIter.Done()
}
return iterator.MakeISliceWorker(sw, breakOnError, nworkers)
log.Debugln("Start of the batch workers")
for i := 0; i < nworkers-1; i++ {
go f(iterator.Split())
}
go f(iterator)
if iterator.IsPaired() {
newIter.MarkAsPaired()
}
return newIter
}
// MakeISliceWorker applies a SeqSliceWorker function to each slice in the IBioSequence iterator,
@ -137,13 +73,6 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, brea
newIter := MakeIBioSequence()
newIter.Add(nworkers)
go func() {
newIter.WaitAndClose()
log.Println("End of the batch slice workers")
}()
f := func(iterator IBioSequence) {
var err error
for iterator.Next() {
@ -153,16 +82,25 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, brea
log.Fatalf("Error on sequence processing : %v", err)
}
newIter.Push(batch)
runtime.GC()
}
newIter.Done()
}
log.Printf("Start of the batch slice workers on %d workers\n", nworkers)
for i := 0; i < nworkers-1; i++ {
log.Debugln("Start of the batch workers")
for i := 1; i < nworkers; i++ {
newIter.Add(1)
go f(iterator.Split())
}
newIter.Add(1)
go f(iterator)
go func() {
newIter.WaitAndClose()
log.Debugln("End of the batch workers")
}()
if iterator.IsPaired() {
newIter.MarkAsPaired()
}

View File

@ -1,6 +1,8 @@
package obiconvert
import (
"io/fs"
"os"
"path/filepath"
"strings"
@ -93,6 +95,15 @@ func CLIWriteBioSequences(iterator obiiter.IBioSequence,
newIter, err = obiformats.WriteSequencesToFile(iterator, fn, opts...)
}
} else {
log.Info("Output is done on stdout")
var s fs.FileInfo
s, err = os.Stdout.Stat()
if err != nil {
return obiiter.NilIBioSequence, err
}
log.Infof("Data is writen to %s", s.Name())
opts = append(opts, obiformats.OptionsSkipEmptySequence(CLISkipEmpty()))
switch CLIOutputFormat() {
case "fastq":

View File

@ -60,5 +60,5 @@ func CLIPCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) {
iterator = iterator.Pipe(frags)
}
return iterator.MakeISliceWorker(worker, false, obioptions.CLIParallelWorkers(), 0), nil
return iterator.MakeISliceWorker(worker, false, obioptions.CLIParallelWorkers()), nil
}

View File

@ -293,7 +293,9 @@ func CLIAssignTaxonomy(iterator obiiter.IBioSequence,
if err == nil {
j++
} else {
log.Warnf("Taxid %d is not described in the taxonomy. Sequence %s is discared from the reference database", seq.Taxid(), seq.Id())
log.Warnf("Taxid %d is not described in the taxonomy."+
" Sequence %s is discared from the reference database",
seq.Taxid(), seq.Id())
}
}