Change the API of workers

Former-commit-id: 9b07306edd8cf28266f86f95823948fa99d39ea9
This commit is contained in:
2024-03-02 16:03:46 -04:00
parent 4a0b20484f
commit 0f3871d203
19 changed files with 194 additions and 120 deletions

View File

@ -15,7 +15,9 @@ import (
// Moreover the SeqWorker function, the method accepted two optional integer parameters.
// - First is allowing to indicates the number of workers running in parallele (default 4)
// - The second the size of the chanel buffer. By default set to the same value than the input buffer.
func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int) IBioSequence {
func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker,
breakOnError bool,
sizes ...int) IBioSequence {
nworkers := obioptions.CLIParallelWorkers()
if len(sizes) > 0 {
@ -32,11 +34,15 @@ func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int)
}()
sw := obiseq.SeqToSliceWorker(worker, true, breakOnError)
f := func(iterator IBioSequence) {
var err error
for iterator.Next() {
batch := iterator.Get()
for i, seq := range batch.slice {
batch.slice[i] = worker(seq)
batch.slice, err = sw(batch.slice)
if err != nil && breakOnError {
log.Fatalf("Error on sequence processing : %v", err)
}
newIter.Push(batch)
}
@ -67,7 +73,7 @@ func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int)
// Return:
// - newIter: A new IBioSequence iterator with the modified sequences.
func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePredicate,
worker obiseq.SeqWorker, sizes ...int) IBioSequence {
worker obiseq.SeqWorker, breakOnError bool, sizes ...int) IBioSequence {
nworkers := obioptions.CLIReadParallelWorkers()
if len(sizes) > 0 {
@ -84,13 +90,15 @@ func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePre
}()
sw := obiseq.SeqToSliceConditionalWorker(predicate, worker, true, breakOnError)
f := func(iterator IBioSequence) {
var err error
for iterator.Next() {
batch := iterator.Get()
for i, seq := range batch.slice {
if predicate(batch.slice[i]) {
batch.slice[i] = worker(seq)
}
batch.slice, err = sw(batch.slice)
if err != nil && breakOnError {
log.Fatalf("Error on sequence processing : %v", err)
}
newIter.Push(batch)
}
@ -120,7 +128,7 @@ func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePre
// provided, the default number of workers is used.
//
// The function returns a new IBioSequence containing the modified slices.
func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, sizes ...int) IBioSequence {
func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, breakOnError bool, sizes ...int) IBioSequence {
nworkers := obioptions.CLIParallelWorkers()
if len(sizes) > 0 {
@ -137,9 +145,13 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, size
}()
f := func(iterator IBioSequence) {
var err error
for iterator.Next() {
batch := iterator.Get()
batch.slice = worker(batch.slice)
batch.slice, err = worker(batch.slice)
if err != nil && breakOnError {
log.Fatalf("Error on sequence processing : %v", err)
}
newIter.Push(batch)
}
newIter.Done()
@ -169,9 +181,9 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, size
//
// Return:
// - f: A Pipeable object that represents the closure created by the WorkerPipe function.
func WorkerPipe(worker obiseq.SeqWorker, sizes ...int) Pipeable {
func WorkerPipe(worker obiseq.SeqWorker, breakOnError bool, sizes ...int) Pipeable {
f := func(iterator IBioSequence) IBioSequence {
return iterator.MakeIWorker(worker, sizes...)
return iterator.MakeIWorker(worker, breakOnError, sizes...)
}
return f
@ -182,9 +194,9 @@ func WorkerPipe(worker obiseq.SeqWorker, sizes ...int) Pipeable {
// The worker parameter is the SeqSliceWorker to be applied.
// The sizes parameter is a variadic parameter representing the sizes of the slices.
// The function returns a Pipeable function that applies the SeqSliceWorker to the iterator.
func SliceWorkerPipe(worker obiseq.SeqSliceWorker, sizes ...int) Pipeable {
func SliceWorkerPipe(worker obiseq.SeqSliceWorker, breakOnError bool, sizes ...int) Pipeable {
f := func(iterator IBioSequence) IBioSequence {
return iterator.MakeISliceWorker(worker, sizes...)
return iterator.MakeISliceWorker(worker, breakOnError, sizes...)
}
return f