Files
obitools4/pkg/obiiter/workers.go

207 lines
4.3 KiB
Go
Raw Normal View History

package obiiter
2022-01-13 23:27:39 +01:00
import (
2022-02-24 12:14:52 +01:00
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
2022-01-13 23:27:39 +01:00
)
type SeqAnnotator func(*obiseq.BioSequence)
2022-01-13 23:27:39 +01:00
type SeqWorker func(*obiseq.BioSequence) *obiseq.BioSequence
type SeqSliceWorker func(obiseq.BioSequenceSlice) obiseq.BioSequenceSlice
2022-01-13 23:27:39 +01:00
func AnnotatorToSeqWorker(function SeqAnnotator) SeqWorker {
f := func(seq *obiseq.BioSequence) *obiseq.BioSequence {
2022-01-13 23:27:39 +01:00
function(seq)
return seq
}
return f
}
// That method allows for applying a SeqWorker function on every sequences.
//
// Sequences are provided by the iterator and modified sequences are pushed
// on the returned IBioSequenceBatch.
//
// Moreover the SeqWorker function, the method accepted two optional integer parameters.
// - First is allowing to indicates the number of workers running in parallele (default 4)
// - The second the size of the chanel buffer. By default set to the same value than the input buffer.
2022-01-13 23:27:39 +01:00
func (iterator IBioSequenceBatch) MakeIWorker(worker SeqWorker, sizes ...int) IBioSequenceBatch {
nworkers := 4
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
nworkers = sizes[0]
}
if len(sizes) > 1 {
buffsize = sizes[1]
}
2022-01-14 17:32:12 +01:00
newIter := MakeIBioSequenceBatch(buffsize)
2022-01-13 23:27:39 +01:00
2022-01-14 17:32:12 +01:00
newIter.Add(nworkers)
2022-01-13 23:27:39 +01:00
go func() {
newIter.WaitAndClose()
2022-02-24 12:14:52 +01:00
log.Debugln("End of the batch workers")
2022-01-13 23:27:39 +01:00
}()
f := func(iterator IBioSequenceBatch) {
for iterator.Next() {
batch := iterator.Get()
for i, seq := range batch.slice {
batch.slice[i] = worker(seq)
}
newIter.Push(batch)
2022-01-13 23:27:39 +01:00
}
2022-01-14 17:32:12 +01:00
newIter.Done()
2022-01-13 23:27:39 +01:00
}
2022-02-24 12:14:52 +01:00
log.Debugln("Start of the batch workers")
for i := 0; i < nworkers-1; i++ {
2022-01-13 23:27:39 +01:00
go f(iterator.Split())
}
go f(iterator)
2022-01-13 23:27:39 +01:00
2022-01-14 17:32:12 +01:00
return newIter
2022-01-13 23:27:39 +01:00
}
func (iterator IBioSequenceBatch) MakeIConditionalWorker(predicate obiseq.SequencePredicate,
worker SeqWorker, sizes ...int) IBioSequenceBatch {
nworkers := 4
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
nworkers = sizes[0]
}
if len(sizes) > 1 {
buffsize = sizes[1]
}
newIter := MakeIBioSequenceBatch(buffsize)
newIter.Add(nworkers)
go func() {
newIter.WaitAndClose()
log.Debugln("End of the batch workers")
}()
f := func(iterator IBioSequenceBatch) {
for iterator.Next() {
batch := iterator.Get()
for i, seq := range batch.slice {
if predicate(batch.slice[i]) {
batch.slice[i] = worker(seq)
}
}
newIter.Push(batch)
}
newIter.Done()
}
log.Debugln("Start of the batch workers")
for i := 0; i < nworkers-1; i++ {
go f(iterator.Split())
}
go f(iterator)
return newIter
}
2022-01-13 23:27:39 +01:00
func (iterator IBioSequenceBatch) MakeISliceWorker(worker SeqSliceWorker, sizes ...int) IBioSequenceBatch {
nworkers := 4
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
nworkers = sizes[0]
}
if len(sizes) > 1 {
buffsize = sizes[1]
}
2022-01-14 17:32:12 +01:00
newIter := MakeIBioSequenceBatch(buffsize)
2022-01-13 23:27:39 +01:00
2022-01-14 17:32:12 +01:00
newIter.Add(nworkers)
2022-01-13 23:27:39 +01:00
go func() {
newIter.WaitAndClose()
2022-01-13 23:27:39 +01:00
log.Println("End of the batch slice workers")
}()
f := func(iterator IBioSequenceBatch) {
for iterator.Next() {
batch := iterator.Get()
batch.slice = worker(batch.slice)
2022-01-14 17:32:12 +01:00
newIter.pointer.channel <- batch
2022-01-13 23:27:39 +01:00
}
2022-01-14 17:32:12 +01:00
newIter.Done()
2022-01-13 23:27:39 +01:00
}
log.Printf("Start of the batch slice workers on %d workers (buffer : %d)\n", nworkers, buffsize)
for i := 0; i < nworkers-1; i++ {
2022-01-13 23:27:39 +01:00
go f(iterator.Split())
}
go f(iterator)
2022-01-13 23:27:39 +01:00
2022-01-14 17:32:12 +01:00
return newIter
2022-01-13 23:27:39 +01:00
}
func (iterator IBioSequence) MakeIWorker(worker SeqWorker, sizes ...int) IBioSequence {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIBioSequence(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
go func() {
for iterator.Next() {
seq := iterator.Get()
seq = worker(seq)
newIter.pointer.channel <- seq
}
newIter.Done()
}()
return newIter
}
func WorkerPipe(worker SeqWorker, sizes ...int) Pipeable {
f := func(iterator IBioSequenceBatch) IBioSequenceBatch {
return iterator.MakeIWorker(worker, sizes...)
}
return f
}
func SliceWorkerPipe(worker SeqSliceWorker, sizes ...int) Pipeable {
f := func(iterator IBioSequenceBatch) IBioSequenceBatch {
return iterator.MakeISliceWorker(worker, sizes...)
}
return f
}
2022-10-05 09:41:59 +02:00
func ReverseComplementWorker(inplace bool) SeqWorker {
f := func(input *obiseq.BioSequence) *obiseq.BioSequence {
return input.ReverseComplement(inplace)
}
return f
}