diff --git a/pkg/obiiter/workers.go b/pkg/obiiter/workers.go index 257cdae..1ff72d9 100644 --- a/pkg/obiiter/workers.go +++ b/pkg/obiiter/workers.go @@ -1,6 +1,8 @@ package obiiter import ( + "runtime" + log "github.com/sirupsen/logrus" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" @@ -24,42 +26,8 @@ func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, nworkers = sizes[0] } - newIter := MakeIBioSequence() - - newIter.Add(nworkers) - - go func() { - newIter.WaitAndClose() - log.Debugln("End of the batch workers") - - }() - sw := obiseq.SeqToSliceWorker(worker, breakOnError) - - f := func(iterator IBioSequence) { - var err error - for iterator.Next() { - batch := iterator.Get() - batch.slice, err = sw(batch.slice) - if err != nil && breakOnError { - log.Fatalf("Error on sequence processing : %v", err) - } - newIter.Push(batch) - } - newIter.Done() - } - - log.Debugln("Start of the batch workers") - for i := 0; i < nworkers-1; i++ { - go f(iterator.Split()) - } - go f(iterator) - - if iterator.IsPaired() { - newIter.MarkAsPaired() - } - - return newIter + return iterator.MakeISliceWorker(sw, breakOnError, nworkers) } // MakeIConditionalWorker applies a given worker function to each sequence in the iterator that satisfies the given predicate. @@ -80,42 +48,10 @@ func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePre nworkers = sizes[0] } - newIter := MakeIBioSequence() - - newIter.Add(nworkers) - - go func() { - newIter.WaitAndClose() - log.Debugln("End of the batch workers") - - }() - sw := obiseq.SeqToSliceConditionalWorker(predicate, worker, breakOnError) - f := func(iterator IBioSequence) { - var err error - for iterator.Next() { - batch := iterator.Get() - batch.slice, err = sw(batch.slice) - if err != nil && breakOnError { - log.Fatalf("Error on sequence processing : %v", err) - } - newIter.Push(batch) - } - newIter.Done() - } + return iterator.MakeISliceWorker(sw, breakOnError, nworkers) - log.Debugln("Start of the batch workers") - for i := 0; i < nworkers-1; i++ { - go f(iterator.Split()) - } - go f(iterator) - - if iterator.IsPaired() { - newIter.MarkAsPaired() - } - - return newIter } // MakeISliceWorker applies a SeqSliceWorker function to each slice in the IBioSequence iterator, @@ -137,13 +73,6 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, brea newIter := MakeIBioSequence() - newIter.Add(nworkers) - - go func() { - newIter.WaitAndClose() - log.Println("End of the batch slice workers") - }() - f := func(iterator IBioSequence) { var err error for iterator.Next() { @@ -153,16 +82,25 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, brea log.Fatalf("Error on sequence processing : %v", err) } newIter.Push(batch) + runtime.GC() } newIter.Done() } - log.Printf("Start of the batch slice workers on %d workers\n", nworkers) - for i := 0; i < nworkers-1; i++ { + log.Debugln("Start of the batch workers") + for i := 1; i < nworkers; i++ { + newIter.Add(1) go f(iterator.Split()) } + newIter.Add(1) go f(iterator) + go func() { + newIter.WaitAndClose() + log.Debugln("End of the batch workers") + + }() + if iterator.IsPaired() { newIter.MarkAsPaired() } diff --git a/pkg/obitools/obiconvert/sequence_writer.go b/pkg/obitools/obiconvert/sequence_writer.go index 308a6c9..52e2e50 100644 --- a/pkg/obitools/obiconvert/sequence_writer.go +++ b/pkg/obitools/obiconvert/sequence_writer.go @@ -1,6 +1,8 @@ package obiconvert import ( + "io/fs" + "os" "path/filepath" "strings" @@ -93,6 +95,15 @@ func CLIWriteBioSequences(iterator obiiter.IBioSequence, newIter, err = obiformats.WriteSequencesToFile(iterator, fn, opts...) } } else { + log.Info("Output is done on stdout") + var s fs.FileInfo + s, err = os.Stdout.Stat() + if err != nil { + return obiiter.NilIBioSequence, err + } + + log.Infof("Data is writen to %s", s.Name()) + opts = append(opts, obiformats.OptionsSkipEmptySequence(CLISkipEmpty())) switch CLIOutputFormat() { case "fastq": diff --git a/pkg/obitools/obipcr/pcr.go b/pkg/obitools/obipcr/pcr.go index cf5312f..3b36b9d 100644 --- a/pkg/obitools/obipcr/pcr.go +++ b/pkg/obitools/obipcr/pcr.go @@ -60,5 +60,5 @@ func CLIPCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) { iterator = iterator.Pipe(frags) } - return iterator.MakeISliceWorker(worker, false, obioptions.CLIParallelWorkers(), 0), nil + return iterator.MakeISliceWorker(worker, false, obioptions.CLIParallelWorkers()), nil } diff --git a/pkg/obitools/obitag/obitag.go b/pkg/obitools/obitag/obitag.go index 9fdb7c3..48318d3 100644 --- a/pkg/obitools/obitag/obitag.go +++ b/pkg/obitools/obitag/obitag.go @@ -293,7 +293,9 @@ func CLIAssignTaxonomy(iterator obiiter.IBioSequence, if err == nil { j++ } else { - log.Warnf("Taxid %d is not described in the taxonomy. Sequence %s is discared from the reference database", seq.Taxid(), seq.Id()) + log.Warnf("Taxid %d is not described in the taxonomy."+ + " Sequence %s is discared from the reference database", + seq.Taxid(), seq.Id()) } }