2022-02-24 07:08:40 +01:00
|
|
|
package obiiter
|
2022-01-13 23:27:39 +01:00
|
|
|
|
|
|
|
import (
|
2022-02-24 12:14:52 +01:00
|
|
|
log "github.com/sirupsen/logrus"
|
2022-02-24 07:08:40 +01:00
|
|
|
|
2023-08-25 14:36:38 +02:00
|
|
|
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
|
2022-02-24 07:08:40 +01:00
|
|
|
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
|
2022-01-13 23:27:39 +01:00
|
|
|
)
|
|
|
|
|
2022-08-31 20:38:03 +02:00
|
|
|
// That method allows for applying a SeqWorker function on every sequences.
|
|
|
|
//
|
|
|
|
// Sequences are provided by the iterator and modified sequences are pushed
|
|
|
|
// on the returned IBioSequenceBatch.
|
|
|
|
//
|
|
|
|
// Moreover the SeqWorker function, the method accepted two optional integer parameters.
|
|
|
|
// - First is allowing to indicates the number of workers running in parallele (default 4)
|
|
|
|
// - The second the size of the chanel buffer. By default set to the same value than the input buffer.
|
2023-01-22 22:39:13 +01:00
|
|
|
func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int) IBioSequence {
|
2023-08-25 14:36:38 +02:00
|
|
|
nworkers := obioptions.CLIParallelWorkers()
|
2022-01-13 23:27:39 +01:00
|
|
|
|
|
|
|
if len(sizes) > 0 {
|
|
|
|
nworkers = sizes[0]
|
|
|
|
}
|
|
|
|
|
2023-03-07 11:12:13 +07:00
|
|
|
newIter := MakeIBioSequence()
|
2022-01-13 23:27:39 +01:00
|
|
|
|
2022-01-14 17:32:12 +01:00
|
|
|
newIter.Add(nworkers)
|
2022-01-13 23:27:39 +01:00
|
|
|
|
|
|
|
go func() {
|
2022-02-21 19:00:23 +01:00
|
|
|
newIter.WaitAndClose()
|
2022-02-24 12:14:52 +01:00
|
|
|
log.Debugln("End of the batch workers")
|
2022-01-13 23:27:39 +01:00
|
|
|
|
|
|
|
}()
|
|
|
|
|
2023-01-22 22:04:17 +01:00
|
|
|
f := func(iterator IBioSequence) {
|
2022-01-13 23:27:39 +01:00
|
|
|
for iterator.Next() {
|
|
|
|
batch := iterator.Get()
|
|
|
|
for i, seq := range batch.slice {
|
|
|
|
batch.slice[i] = worker(seq)
|
|
|
|
}
|
2022-02-21 19:00:23 +01:00
|
|
|
newIter.Push(batch)
|
2022-01-13 23:27:39 +01:00
|
|
|
}
|
2022-01-14 17:32:12 +01:00
|
|
|
newIter.Done()
|
2022-01-13 23:27:39 +01:00
|
|
|
}
|
|
|
|
|
2022-02-24 12:14:52 +01:00
|
|
|
log.Debugln("Start of the batch workers")
|
2022-01-14 23:11:36 +01:00
|
|
|
for i := 0; i < nworkers-1; i++ {
|
2022-01-13 23:27:39 +01:00
|
|
|
go f(iterator.Split())
|
|
|
|
}
|
2022-01-14 23:11:36 +01:00
|
|
|
go f(iterator)
|
2022-01-13 23:27:39 +01:00
|
|
|
|
2023-02-23 23:35:58 +01:00
|
|
|
if iterator.IsPaired() {
|
|
|
|
newIter.MarkAsPaired()
|
|
|
|
}
|
|
|
|
|
2022-01-14 17:32:12 +01:00
|
|
|
return newIter
|
2022-01-13 23:27:39 +01:00
|
|
|
}
|
|
|
|
|
2023-01-22 22:04:17 +01:00
|
|
|
func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePredicate,
|
2023-01-22 22:39:13 +01:00
|
|
|
worker obiseq.SeqWorker, sizes ...int) IBioSequence {
|
2022-08-31 20:38:03 +02:00
|
|
|
nworkers := 4
|
|
|
|
|
|
|
|
if len(sizes) > 0 {
|
|
|
|
nworkers = sizes[0]
|
|
|
|
}
|
|
|
|
|
2023-03-07 11:12:13 +07:00
|
|
|
newIter := MakeIBioSequence()
|
2022-08-31 20:38:03 +02:00
|
|
|
|
|
|
|
newIter.Add(nworkers)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
newIter.WaitAndClose()
|
|
|
|
log.Debugln("End of the batch workers")
|
|
|
|
|
|
|
|
}()
|
|
|
|
|
2023-01-22 22:04:17 +01:00
|
|
|
f := func(iterator IBioSequence) {
|
2022-08-31 20:38:03 +02:00
|
|
|
for iterator.Next() {
|
|
|
|
batch := iterator.Get()
|
|
|
|
for i, seq := range batch.slice {
|
|
|
|
if predicate(batch.slice[i]) {
|
|
|
|
batch.slice[i] = worker(seq)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
newIter.Push(batch)
|
|
|
|
}
|
|
|
|
newIter.Done()
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debugln("Start of the batch workers")
|
|
|
|
for i := 0; i < nworkers-1; i++ {
|
|
|
|
go f(iterator.Split())
|
|
|
|
}
|
|
|
|
go f(iterator)
|
|
|
|
|
2023-02-23 23:35:58 +01:00
|
|
|
if iterator.IsPaired() {
|
|
|
|
newIter.MarkAsPaired()
|
|
|
|
}
|
|
|
|
|
2022-08-31 20:38:03 +02:00
|
|
|
return newIter
|
|
|
|
}
|
|
|
|
|
2023-01-22 22:39:13 +01:00
|
|
|
func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, sizes ...int) IBioSequence {
|
2022-01-13 23:27:39 +01:00
|
|
|
nworkers := 4
|
|
|
|
|
|
|
|
if len(sizes) > 0 {
|
|
|
|
nworkers = sizes[0]
|
|
|
|
}
|
|
|
|
|
2023-03-07 11:12:13 +07:00
|
|
|
newIter := MakeIBioSequence()
|
2022-01-13 23:27:39 +01:00
|
|
|
|
2022-01-14 17:32:12 +01:00
|
|
|
newIter.Add(nworkers)
|
2022-01-13 23:27:39 +01:00
|
|
|
|
|
|
|
go func() {
|
2022-02-21 19:00:23 +01:00
|
|
|
newIter.WaitAndClose()
|
2022-01-13 23:27:39 +01:00
|
|
|
log.Println("End of the batch slice workers")
|
|
|
|
}()
|
|
|
|
|
2023-01-22 22:04:17 +01:00
|
|
|
f := func(iterator IBioSequence) {
|
2022-01-13 23:27:39 +01:00
|
|
|
for iterator.Next() {
|
|
|
|
batch := iterator.Get()
|
|
|
|
batch.slice = worker(batch.slice)
|
2023-03-28 19:37:05 +07:00
|
|
|
newIter.Push(batch)
|
2022-01-13 23:27:39 +01:00
|
|
|
}
|
2022-01-14 17:32:12 +01:00
|
|
|
newIter.Done()
|
2022-01-13 23:27:39 +01:00
|
|
|
}
|
|
|
|
|
2023-03-07 11:12:13 +07:00
|
|
|
log.Printf("Start of the batch slice workers on %d workers\n", nworkers)
|
2022-01-15 19:10:16 +01:00
|
|
|
for i := 0; i < nworkers-1; i++ {
|
2022-01-13 23:27:39 +01:00
|
|
|
go f(iterator.Split())
|
|
|
|
}
|
2022-01-14 23:11:36 +01:00
|
|
|
go f(iterator)
|
2022-01-13 23:27:39 +01:00
|
|
|
|
2023-02-23 23:35:58 +01:00
|
|
|
if iterator.IsPaired() {
|
|
|
|
newIter.MarkAsPaired()
|
|
|
|
}
|
|
|
|
|
2022-01-14 17:32:12 +01:00
|
|
|
return newIter
|
2022-01-13 23:27:39 +01:00
|
|
|
}
|
2022-02-24 07:08:40 +01:00
|
|
|
|
2023-01-22 22:39:13 +01:00
|
|
|
func WorkerPipe(worker obiseq.SeqWorker, sizes ...int) Pipeable {
|
2023-01-22 22:04:17 +01:00
|
|
|
f := func(iterator IBioSequence) IBioSequence {
|
2022-08-21 13:41:58 +02:00
|
|
|
return iterator.MakeIWorker(worker, sizes...)
|
2022-02-24 07:08:40 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return f
|
|
|
|
}
|
|
|
|
|
2023-01-22 22:39:13 +01:00
|
|
|
func SliceWorkerPipe(worker obiseq.SeqSliceWorker, sizes ...int) Pipeable {
|
2023-01-22 22:04:17 +01:00
|
|
|
f := func(iterator IBioSequence) IBioSequence {
|
2022-08-21 13:41:58 +02:00
|
|
|
return iterator.MakeISliceWorker(worker, sizes...)
|
2022-02-24 07:08:40 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return f
|
|
|
|
}
|