Files
obitools4/pkg/obiiter/workers.go

192 lines
5.5 KiB
Go
Raw Normal View History

package obiiter
2022-01-13 23:27:39 +01:00
import (
2022-02-24 12:14:52 +01:00
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
2022-01-13 23:27:39 +01:00
)
// That method allows for applying a SeqWorker function on every sequences.
//
// Sequences are provided by the iterator and modified sequences are pushed
// on the returned IBioSequenceBatch.
//
// Moreover the SeqWorker function, the method accepted two optional integer parameters.
// - First is allowing to indicates the number of workers running in parallele (default 4)
// - The second the size of the chanel buffer. By default set to the same value than the input buffer.
func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int) IBioSequence {
nworkers := obioptions.CLIParallelWorkers()
2022-01-13 23:27:39 +01:00
if len(sizes) > 0 {
nworkers = sizes[0]
}
newIter := MakeIBioSequence()
2022-01-13 23:27:39 +01:00
2022-01-14 17:32:12 +01:00
newIter.Add(nworkers)
2022-01-13 23:27:39 +01:00
go func() {
newIter.WaitAndClose()
2022-02-24 12:14:52 +01:00
log.Debugln("End of the batch workers")
2022-01-13 23:27:39 +01:00
}()
2023-01-22 22:04:17 +01:00
f := func(iterator IBioSequence) {
2022-01-13 23:27:39 +01:00
for iterator.Next() {
batch := iterator.Get()
for i, seq := range batch.slice {
batch.slice[i] = worker(seq)
}
newIter.Push(batch)
2022-01-13 23:27:39 +01:00
}
2022-01-14 17:32:12 +01:00
newIter.Done()
2022-01-13 23:27:39 +01:00
}
2022-02-24 12:14:52 +01:00
log.Debugln("Start of the batch workers")
for i := 0; i < nworkers-1; i++ {
2022-01-13 23:27:39 +01:00
go f(iterator.Split())
}
go f(iterator)
2022-01-13 23:27:39 +01:00
if iterator.IsPaired() {
newIter.MarkAsPaired()
}
2022-01-14 17:32:12 +01:00
return newIter
2022-01-13 23:27:39 +01:00
}
// MakeIConditionalWorker applies a given worker function to each sequence in the iterator that satisfies the given predicate.
// It creates a new iterator with the modified sequences and returns it.
//
// Parameters:
// - predicate: A function that takes a sequence and returns a boolean value indicating whether the sequence satisfies a certain condition.
// - worker: A function that takes a sequence and returns a modified version of the sequence.
// - sizes: Optional. One or more integers representing the number of workers to be used for parallel processing. If not provided, the number of workers will be determined by the obioptions.CLIReadParallelWorkers() function.
//
// Return:
// - newIter: A new IBioSequence iterator with the modified sequences.
2023-01-22 22:04:17 +01:00
func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePredicate,
worker obiseq.SeqWorker, sizes ...int) IBioSequence {
nworkers := obioptions.CLIReadParallelWorkers()
if len(sizes) > 0 {
nworkers = sizes[0]
}
newIter := MakeIBioSequence()
newIter.Add(nworkers)
go func() {
newIter.WaitAndClose()
log.Debugln("End of the batch workers")
}()
2023-01-22 22:04:17 +01:00
f := func(iterator IBioSequence) {
for iterator.Next() {
batch := iterator.Get()
for i, seq := range batch.slice {
if predicate(batch.slice[i]) {
batch.slice[i] = worker(seq)
}
}
newIter.Push(batch)
}
newIter.Done()
}
log.Debugln("Start of the batch workers")
for i := 0; i < nworkers-1; i++ {
go f(iterator.Split())
}
go f(iterator)
if iterator.IsPaired() {
newIter.MarkAsPaired()
}
return newIter
}
// MakeISliceWorker applies a SeqSliceWorker function to each slice in the IBioSequence iterator,
// creating a new IBioSequence with the modified slices.
//
// The worker function takes a slice as input and returns a modified slice. It is applied to each
// slice in the iterator.
//
// The sizes argument is optional and specifies the number of workers to use. If sizes is not
// provided, the default number of workers is used.
//
// The function returns a new IBioSequence containing the modified slices.
func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, sizes ...int) IBioSequence {
nworkers := obioptions.CLIParallelWorkers()
2022-01-13 23:27:39 +01:00
if len(sizes) > 0 {
nworkers = sizes[0]
}
newIter := MakeIBioSequence()
2022-01-13 23:27:39 +01:00
2022-01-14 17:32:12 +01:00
newIter.Add(nworkers)
2022-01-13 23:27:39 +01:00
go func() {
newIter.WaitAndClose()
2022-01-13 23:27:39 +01:00
log.Println("End of the batch slice workers")
}()
2023-01-22 22:04:17 +01:00
f := func(iterator IBioSequence) {
2022-01-13 23:27:39 +01:00
for iterator.Next() {
batch := iterator.Get()
batch.slice = worker(batch.slice)
newIter.Push(batch)
2022-01-13 23:27:39 +01:00
}
2022-01-14 17:32:12 +01:00
newIter.Done()
2022-01-13 23:27:39 +01:00
}
log.Printf("Start of the batch slice workers on %d workers\n", nworkers)
for i := 0; i < nworkers-1; i++ {
2022-01-13 23:27:39 +01:00
go f(iterator.Split())
}
go f(iterator)
2022-01-13 23:27:39 +01:00
if iterator.IsPaired() {
newIter.MarkAsPaired()
}
2022-01-14 17:32:12 +01:00
return newIter
2022-01-13 23:27:39 +01:00
}
// WorkerPipe is a function that takes a SeqWorker and a variadic list of sizes as parameters and returns a Pipeable.
//
// The WorkerPipe function creates a closure that takes an IBioSequence iterator as a parameter and returns an IBioSequence.
// Inside the closure, the MakeIWorker method of the iterator is called with the provided worker and sizes, and the result is returned.
//
// Parameters:
// - worker: A SeqWorker object that represents the worker to be used in the closure.
// - sizes: A variadic list of int values that represents the sizes to be used in the MakeIWorker method.
//
// Return:
// - f: A Pipeable object that represents the closure created by the WorkerPipe function.
func WorkerPipe(worker obiseq.SeqWorker, sizes ...int) Pipeable {
2023-01-22 22:04:17 +01:00
f := func(iterator IBioSequence) IBioSequence {
return iterator.MakeIWorker(worker, sizes...)
}
return f
}
// SliceWorkerPipe creates a Pipeable function that applies a SeqSliceWorker to an iterator.
//
// The worker parameter is the SeqSliceWorker to be applied.
// The sizes parameter is a variadic parameter representing the sizes of the slices.
// The function returns a Pipeable function that applies the SeqSliceWorker to the iterator.
func SliceWorkerPipe(worker obiseq.SeqSliceWorker, sizes ...int) Pipeable {
2023-01-22 22:04:17 +01:00
f := func(iterator IBioSequence) IBioSequence {
return iterator.MakeISliceWorker(worker, sizes...)
}
return f
}