mirror of
https://github.com/metabarcoding/obitools4.git
synced 2025-12-08 08:40:26 +00:00
Some code refactoring, a new version of obiuniq more efficient in memory and a first make file allowing to build obitools
This commit is contained in:
53
pkg/obiiter/batch.go
Normal file
53
pkg/obiiter/batch.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package obiiter
|
||||
|
||||
import "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
|
||||
|
||||
type BioSequenceBatch struct {
|
||||
slice obiseq.BioSequenceSlice
|
||||
order int
|
||||
}
|
||||
|
||||
var NilBioSequenceBatch = BioSequenceBatch{nil, -1}
|
||||
|
||||
func MakeBioSequenceBatch(order int,
|
||||
sequences obiseq.BioSequenceSlice) BioSequenceBatch {
|
||||
|
||||
return BioSequenceBatch{
|
||||
slice: sequences,
|
||||
order: order,
|
||||
}
|
||||
}
|
||||
|
||||
func (batch BioSequenceBatch) Order() int {
|
||||
return batch.order
|
||||
}
|
||||
|
||||
func (batch BioSequenceBatch) Reorder(newOrder int) BioSequenceBatch {
|
||||
batch.order = newOrder
|
||||
return batch
|
||||
}
|
||||
|
||||
func (batch BioSequenceBatch) Slice() obiseq.BioSequenceSlice {
|
||||
return batch.slice
|
||||
}
|
||||
|
||||
func (batch BioSequenceBatch) Length() int {
|
||||
return len(batch.slice)
|
||||
}
|
||||
|
||||
func (batch BioSequenceBatch) NotEmpty() bool {
|
||||
return batch.slice.NotEmpty()
|
||||
}
|
||||
|
||||
func (batch BioSequenceBatch) Pop0() *obiseq.BioSequence {
|
||||
return batch.slice.Pop0()
|
||||
}
|
||||
|
||||
func (batch BioSequenceBatch) IsNil() bool {
|
||||
return batch.slice == nil
|
||||
}
|
||||
|
||||
func (batch BioSequenceBatch) Recycle() {
|
||||
batch.slice.Recycle()
|
||||
batch.slice = nil
|
||||
}
|
||||
560
pkg/obiiter/batchiterator.go
Normal file
560
pkg/obiiter/batchiterator.go
Normal file
@@ -0,0 +1,560 @@
|
||||
package obiiter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
|
||||
"github.com/tevino/abool/v2"
|
||||
)
|
||||
|
||||
// Structure implementing an iterator over bioseq.BioSequenceBatch
|
||||
// based on a channel.
|
||||
type _IBioSequenceBatch struct {
|
||||
channel chan BioSequenceBatch
|
||||
current BioSequenceBatch
|
||||
pushBack *abool.AtomicBool
|
||||
all_done *sync.WaitGroup
|
||||
lock *sync.RWMutex
|
||||
buffer_size int32
|
||||
batch_size int32
|
||||
sequence_format string
|
||||
finished *abool.AtomicBool
|
||||
}
|
||||
|
||||
type IBioSequenceBatch struct {
|
||||
pointer *_IBioSequenceBatch
|
||||
}
|
||||
|
||||
// NilIBioSequenceBatch nil instance for IBioSequenceBatch
|
||||
//
|
||||
// NilIBioSequenceBatch is the nil instance for the
|
||||
// IBioSequenceBatch type.
|
||||
//
|
||||
var NilIBioSequenceBatch = IBioSequenceBatch{pointer: nil}
|
||||
|
||||
func MakeIBioSequenceBatch(sizes ...int) IBioSequenceBatch {
|
||||
buffsize := int32(1)
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = int32(sizes[0])
|
||||
}
|
||||
|
||||
i := _IBioSequenceBatch{
|
||||
channel: make(chan BioSequenceBatch, buffsize),
|
||||
current: NilBioSequenceBatch,
|
||||
pushBack: abool.New(),
|
||||
buffer_size: buffsize,
|
||||
batch_size: -1,
|
||||
sequence_format: "",
|
||||
finished: abool.New(),
|
||||
}
|
||||
|
||||
waiting := sync.WaitGroup{}
|
||||
i.all_done = &waiting
|
||||
lock := sync.RWMutex{}
|
||||
i.lock = &lock
|
||||
ii := IBioSequenceBatch{&i}
|
||||
return ii
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Add(n int) {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.Add method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
iterator.pointer.all_done.Add(n)
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Done() {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.Done method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
iterator.pointer.all_done.Done()
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Unlock() {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.Unlock method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
iterator.pointer.lock.Unlock()
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Lock() {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.Lock method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
iterator.pointer.lock.Lock()
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) RLock() {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.RLock method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
iterator.pointer.lock.RLock()
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) RUnlock() {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.RUnlock method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
iterator.pointer.lock.RUnlock()
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Wait() {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.Wait method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
iterator.pointer.all_done.Wait()
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Channel() chan BioSequenceBatch {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.Channel method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
return iterator.pointer.channel
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) IsNil() bool {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.IsNil method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
return iterator.pointer == nil
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) BufferSize() int {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.BufferSize method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
return int(atomic.LoadInt32(&iterator.pointer.buffer_size))
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) BatchSize() int {
|
||||
if iterator.pointer == nil {
|
||||
log.Panic("call of IBioSequenceBatch.BatchSize method on NilIBioSequenceBatch")
|
||||
}
|
||||
|
||||
return int(atomic.LoadInt32(&iterator.pointer.batch_size))
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) SetBatchSize(size int) error {
|
||||
if size >= 0 {
|
||||
atomic.StoreInt32(&iterator.pointer.batch_size, int32(size))
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("size (%d) cannot be negative", size)
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Split() IBioSequenceBatch {
|
||||
iterator.pointer.lock.RLock()
|
||||
defer iterator.pointer.lock.RUnlock()
|
||||
i := _IBioSequenceBatch{
|
||||
channel: iterator.pointer.channel,
|
||||
current: NilBioSequenceBatch,
|
||||
pushBack: abool.New(),
|
||||
all_done: iterator.pointer.all_done,
|
||||
buffer_size: iterator.pointer.buffer_size,
|
||||
batch_size: iterator.pointer.batch_size,
|
||||
sequence_format: iterator.pointer.sequence_format,
|
||||
finished: iterator.pointer.finished}
|
||||
lock := sync.RWMutex{}
|
||||
i.lock = &lock
|
||||
|
||||
newIter := IBioSequenceBatch{&i}
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Next() bool {
|
||||
if iterator.pointer.pushBack.IsSet() {
|
||||
iterator.pointer.pushBack.UnSet()
|
||||
return true
|
||||
}
|
||||
|
||||
if iterator.pointer.finished.IsSet() {
|
||||
return false
|
||||
}
|
||||
|
||||
next, ok := (<-iterator.pointer.channel)
|
||||
|
||||
if ok {
|
||||
iterator.pointer.current = next
|
||||
return true
|
||||
}
|
||||
|
||||
iterator.pointer.current = NilBioSequenceBatch
|
||||
iterator.pointer.finished.Set()
|
||||
return false
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) PushBack() {
|
||||
if !iterator.pointer.current.IsNil() {
|
||||
iterator.pointer.pushBack.Set()
|
||||
}
|
||||
}
|
||||
|
||||
// The 'Get' method returns the instance of BioSequenceBatch
|
||||
// currently pointed by the iterator. You have to use the
|
||||
// 'Next' method to move to the next entry before calling
|
||||
// 'Get' to retreive the following instance.
|
||||
func (iterator IBioSequenceBatch) Get() BioSequenceBatch {
|
||||
return iterator.pointer.current
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Push(batch BioSequenceBatch) {
|
||||
if batch.IsNil() {
|
||||
log.Panicln("An Nil batch is pushed on the channel")
|
||||
}
|
||||
if batch.Length() == 0 {
|
||||
log.Panicln("An empty batch is pushed on the channel")
|
||||
}
|
||||
|
||||
iterator.pointer.channel <- batch
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Close() {
|
||||
close(iterator.pointer.channel)
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) WaitAndClose() {
|
||||
iterator.Wait()
|
||||
|
||||
for len(iterator.Channel()) > 0 {
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
iterator.Close()
|
||||
}
|
||||
|
||||
// Finished returns 'true' value if no more data is available
|
||||
// from the iterator.
|
||||
func (iterator IBioSequenceBatch) Finished() bool {
|
||||
return iterator.pointer.finished.IsSet()
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) IBioSequence(sizes ...int) IBioSequence {
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequence(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.Channel())
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for iterator.Next() {
|
||||
batch := iterator.Get()
|
||||
|
||||
for batch.NotEmpty() {
|
||||
newIter.Channel() <- batch.Pop0()
|
||||
}
|
||||
batch.Recycle()
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) SortBatches(sizes ...int) IBioSequenceBatch {
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequenceBatch(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.pointer.channel)
|
||||
}()
|
||||
|
||||
next_to_send := 0
|
||||
received := make(map[int]BioSequenceBatch)
|
||||
go func() {
|
||||
for iterator.Next() {
|
||||
batch := iterator.Get()
|
||||
if batch.order == next_to_send {
|
||||
newIter.pointer.channel <- batch
|
||||
next_to_send++
|
||||
batch, ok := received[next_to_send]
|
||||
for ok {
|
||||
newIter.pointer.channel <- batch
|
||||
delete(received, next_to_send)
|
||||
next_to_send++
|
||||
batch, ok = received[next_to_send]
|
||||
}
|
||||
} else {
|
||||
received[batch.order] = batch
|
||||
}
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Concat(iterators ...IBioSequenceBatch) IBioSequenceBatch {
|
||||
|
||||
if len(iterators) == 0 {
|
||||
return iterator
|
||||
}
|
||||
|
||||
buffsize := iterator.BufferSize()
|
||||
newIter := MakeIBioSequenceBatch(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.Channel())
|
||||
}()
|
||||
|
||||
go func() {
|
||||
previous_max := 0
|
||||
max_order := 0
|
||||
|
||||
for iterator.Next() {
|
||||
s := iterator.Get()
|
||||
if s.order > max_order {
|
||||
max_order = s.order
|
||||
}
|
||||
newIter.Push(s.Reorder(s.order + previous_max))
|
||||
}
|
||||
|
||||
previous_max = max_order + 1
|
||||
for _, iter := range iterators {
|
||||
for iter.Next() {
|
||||
s := iter.Get()
|
||||
if (s.order + previous_max) > max_order {
|
||||
max_order = s.order + previous_max
|
||||
}
|
||||
|
||||
newIter.Push(s.Reorder(s.order + previous_max))
|
||||
}
|
||||
previous_max = max_order + 1
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
// Redistributes sequences from a IBioSequenceBatch into a new
|
||||
// IBioSequenceBatch with every batches having the same size
|
||||
// indicated in parameter. Rebatching implies to sort the
|
||||
// source IBioSequenceBatch.
|
||||
func (iterator IBioSequenceBatch) Rebatch(size int, sizes ...int) IBioSequenceBatch {
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequenceBatch(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.pointer.channel)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
order := 0
|
||||
iterator = iterator.SortBatches()
|
||||
buffer := obiseq.MakeBioSequenceSlice()
|
||||
|
||||
for iterator.Next() {
|
||||
seqs := iterator.Get()
|
||||
for _, s := range seqs.slice {
|
||||
buffer = append(buffer, s)
|
||||
if len(buffer) == size {
|
||||
newIter.Push(MakeBioSequenceBatch(order, buffer))
|
||||
order++
|
||||
buffer = obiseq.MakeBioSequenceSlice()
|
||||
}
|
||||
}
|
||||
seqs.Recycle()
|
||||
}
|
||||
|
||||
if len(buffer) > 0 {
|
||||
newIter.Push(MakeBioSequenceBatch(order, buffer))
|
||||
}
|
||||
|
||||
newIter.Done()
|
||||
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Recycle() {
|
||||
|
||||
log.Println("Start recycling of Bioseq objects")
|
||||
recycled := 0
|
||||
for iterator.Next() {
|
||||
// iterator.Get()
|
||||
batch := iterator.Get()
|
||||
for _, seq := range batch.Slice() {
|
||||
seq.Recycle()
|
||||
recycled++
|
||||
}
|
||||
batch.Recycle()
|
||||
}
|
||||
log.Printf("End of the recycling of %d Bioseq objects", recycled)
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Count(recycle bool) (int, int, int) {
|
||||
variants := 0
|
||||
reads := 0
|
||||
nucleotides := 0
|
||||
|
||||
log.Println("Start counting of Bioseq objects")
|
||||
for iterator.Next() {
|
||||
// iterator.Get()
|
||||
batch := iterator.Get()
|
||||
for _, seq := range batch.Slice() {
|
||||
variants++
|
||||
reads += seq.Count()
|
||||
nucleotides += seq.Length()
|
||||
|
||||
if recycle {
|
||||
seq.Recycle()
|
||||
}
|
||||
}
|
||||
batch.Recycle()
|
||||
}
|
||||
log.Printf("End of the counting of %d Bioseq objects", variants)
|
||||
return variants, reads, nucleotides
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) PairWith(reverse IBioSequenceBatch,
|
||||
sizes ...int) IPairedBioSequenceBatch {
|
||||
buffsize := iterator.BufferSize()
|
||||
batchsize := 5000
|
||||
|
||||
if len(sizes) > 0 {
|
||||
batchsize = sizes[0]
|
||||
}
|
||||
|
||||
if len(sizes) > 1 {
|
||||
buffsize = sizes[1]
|
||||
}
|
||||
|
||||
iterator = iterator.Rebatch(batchsize)
|
||||
reverse = reverse.Rebatch(batchsize)
|
||||
|
||||
newIter := MakeIPairedBioSequenceBatch(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.Channel())
|
||||
log.Println("End of association of paired reads")
|
||||
}()
|
||||
|
||||
log.Println("Start association of paired reads")
|
||||
go func() {
|
||||
for iterator.Next() {
|
||||
if !reverse.Next() {
|
||||
log.Panicln("Etrange reverse pas prêt")
|
||||
}
|
||||
newIter.Channel() <- MakePairedBioSequenceBatch(iterator.Get(),
|
||||
reverse.Get())
|
||||
}
|
||||
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) DivideOn(predicate obiseq.SequencePredicate,
|
||||
size int, sizes ...int) (IBioSequenceBatch, IBioSequenceBatch) {
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
trueIter := MakeIBioSequenceBatch(buffsize)
|
||||
falseIter := MakeIBioSequenceBatch(buffsize)
|
||||
|
||||
trueIter.Add(1)
|
||||
falseIter.Add(1)
|
||||
|
||||
go func() {
|
||||
trueIter.WaitAndClose()
|
||||
falseIter.WaitAndClose()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
trueOrder := 0
|
||||
falseOrder := 0
|
||||
iterator = iterator.SortBatches()
|
||||
|
||||
trueSlice := obiseq.MakeBioSequenceSlice()
|
||||
falseSlice := obiseq.MakeBioSequenceSlice()
|
||||
|
||||
for iterator.Next() {
|
||||
seqs := iterator.Get()
|
||||
for _, s := range seqs.slice {
|
||||
if predicate(s) {
|
||||
trueSlice = append(trueSlice, s)
|
||||
} else {
|
||||
falseSlice = append(falseSlice, s)
|
||||
}
|
||||
|
||||
if len(trueSlice) == size {
|
||||
trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice))
|
||||
trueOrder++
|
||||
trueSlice = obiseq.MakeBioSequenceSlice()
|
||||
}
|
||||
|
||||
if len(falseSlice) == size {
|
||||
falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice))
|
||||
falseOrder++
|
||||
falseSlice = obiseq.MakeBioSequenceSlice()
|
||||
}
|
||||
}
|
||||
seqs.Recycle()
|
||||
}
|
||||
|
||||
if len(trueSlice) > 0 {
|
||||
trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice))
|
||||
}
|
||||
|
||||
if len(falseSlice) > 0 {
|
||||
falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice))
|
||||
}
|
||||
|
||||
trueIter.Done()
|
||||
falseIter.Done()
|
||||
}()
|
||||
|
||||
return trueIter, falseIter
|
||||
}
|
||||
117
pkg/obiiter/distribute.go
Normal file
117
pkg/obiiter/distribute.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package obiiter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
|
||||
)
|
||||
|
||||
type IDistribute struct {
|
||||
outputs map[int]IBioSequenceBatch
|
||||
news chan int
|
||||
classifier *obiseq.BioSequenceClassifier
|
||||
lock *sync.Mutex
|
||||
}
|
||||
|
||||
func (dist *IDistribute) Outputs(key int) (IBioSequenceBatch, error) {
|
||||
dist.lock.Lock()
|
||||
iter, ok := dist.outputs[key]
|
||||
dist.lock.Unlock()
|
||||
|
||||
if !ok {
|
||||
return NilIBioSequenceBatch, fmt.Errorf("code %d unknown", key)
|
||||
}
|
||||
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
func (dist *IDistribute) News() chan int {
|
||||
return dist.news
|
||||
}
|
||||
|
||||
func (dist *IDistribute) Classifier() *obiseq.BioSequenceClassifier {
|
||||
return dist.classifier
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) Distribute(class *obiseq.BioSequenceClassifier, sizes ...int) IDistribute {
|
||||
batchsize := 5000
|
||||
buffsize := 2
|
||||
|
||||
outputs := make(map[int]IBioSequenceBatch, 100)
|
||||
slices := make(map[int]*obiseq.BioSequenceSlice, 100)
|
||||
orders := make(map[int]int, 100)
|
||||
news := make(chan int)
|
||||
|
||||
if len(sizes) > 0 {
|
||||
batchsize = sizes[0]
|
||||
}
|
||||
|
||||
if len(sizes) > 1 {
|
||||
buffsize = sizes[1]
|
||||
}
|
||||
|
||||
jobDone := sync.WaitGroup{}
|
||||
lock := sync.Mutex{}
|
||||
|
||||
jobDone.Add(1)
|
||||
|
||||
go func() {
|
||||
jobDone.Wait()
|
||||
close(news)
|
||||
for _, i := range outputs {
|
||||
i.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
iterator = iterator.SortBatches()
|
||||
|
||||
for iterator.Next() {
|
||||
seqs := iterator.Get()
|
||||
for _, s := range seqs.Slice() {
|
||||
key := class.Code(s)
|
||||
slice, ok := slices[key]
|
||||
|
||||
if !ok {
|
||||
s := obiseq.MakeBioSequenceSlice()
|
||||
slice = &s
|
||||
slices[key] = slice
|
||||
orders[key] = 0
|
||||
|
||||
lock.Lock()
|
||||
outputs[key] = MakeIBioSequenceBatch(buffsize)
|
||||
lock.Unlock()
|
||||
|
||||
news <- key
|
||||
}
|
||||
|
||||
*slice = append(*slice, s)
|
||||
|
||||
if len(*slice) == batchsize {
|
||||
outputs[key].Push(MakeBioSequenceBatch(orders[key], *slice))
|
||||
orders[key]++
|
||||
s := obiseq.MakeBioSequenceSlice()
|
||||
slices[key] = &s
|
||||
}
|
||||
}
|
||||
seqs.Recycle()
|
||||
}
|
||||
|
||||
for key, slice := range slices {
|
||||
if len(*slice) > 0 {
|
||||
outputs[key].Push(MakeBioSequenceBatch(orders[key], *slice))
|
||||
}
|
||||
}
|
||||
|
||||
jobDone.Done()
|
||||
|
||||
}()
|
||||
|
||||
return IDistribute{
|
||||
outputs,
|
||||
news,
|
||||
class,
|
||||
&lock}
|
||||
|
||||
}
|
||||
342
pkg/obiiter/iterator.go
Normal file
342
pkg/obiiter/iterator.go
Normal file
@@ -0,0 +1,342 @@
|
||||
package obiiter
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
|
||||
)
|
||||
|
||||
// Private structure implementing an iterator over
|
||||
// bioseq.BioSequence based on a channel.
|
||||
type __ibiosequence__ struct {
|
||||
channel chan *obiseq.BioSequence
|
||||
current *obiseq.BioSequence
|
||||
pushBack bool
|
||||
all_done *sync.WaitGroup
|
||||
buffer_size int
|
||||
finished bool
|
||||
pFinished *bool
|
||||
}
|
||||
|
||||
type IBioSequence struct {
|
||||
pointer *__ibiosequence__
|
||||
}
|
||||
|
||||
var NilIBioSequence = IBioSequence{pointer: nil}
|
||||
|
||||
func (iterator IBioSequence) IsNil() bool {
|
||||
return iterator.pointer == nil
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) Add(n int) {
|
||||
iterator.pointer.all_done.Add(n)
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) Done() {
|
||||
iterator.pointer.all_done.Done()
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) Wait() {
|
||||
iterator.pointer.all_done.Wait()
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) Channel() chan *obiseq.BioSequence {
|
||||
return iterator.pointer.channel
|
||||
}
|
||||
func (iterator IBioSequence) PChannel() *chan *obiseq.BioSequence {
|
||||
return &(iterator.pointer.channel)
|
||||
}
|
||||
|
||||
func MakeIBioSequence(sizes ...int) IBioSequence {
|
||||
buffsize := 1
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
i := __ibiosequence__{
|
||||
channel: make(chan *obiseq.BioSequence, buffsize),
|
||||
current: nil,
|
||||
pushBack: false,
|
||||
buffer_size: buffsize,
|
||||
finished: false,
|
||||
pFinished: nil,
|
||||
}
|
||||
|
||||
i.pFinished = &i.finished
|
||||
waiting := sync.WaitGroup{}
|
||||
i.all_done = &waiting
|
||||
ii := IBioSequence{&i}
|
||||
return ii
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) Split() IBioSequence {
|
||||
|
||||
i := __ibiosequence__{
|
||||
channel: iterator.pointer.channel,
|
||||
current: nil,
|
||||
pushBack: false,
|
||||
finished: false,
|
||||
all_done: iterator.pointer.all_done,
|
||||
buffer_size: iterator.pointer.buffer_size,
|
||||
pFinished: iterator.pointer.pFinished,
|
||||
}
|
||||
|
||||
newIter := IBioSequence{&i}
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) Next() bool {
|
||||
if iterator.IsNil() || *(iterator.pointer.pFinished) {
|
||||
iterator.pointer.current = nil
|
||||
return false
|
||||
}
|
||||
|
||||
if iterator.pointer.pushBack {
|
||||
iterator.pointer.pushBack = false
|
||||
return true
|
||||
}
|
||||
|
||||
next, ok := (<-iterator.pointer.channel)
|
||||
|
||||
if ok {
|
||||
iterator.pointer.current = next
|
||||
return true
|
||||
}
|
||||
|
||||
iterator.pointer.current = nil
|
||||
*iterator.pointer.pFinished = true
|
||||
return false
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) PushBack() {
|
||||
if !(iterator.pointer.current == nil) {
|
||||
iterator.pointer.pushBack = true
|
||||
}
|
||||
}
|
||||
|
||||
// The 'Get' method returns the instance of BioSequence
|
||||
// currently pointed by the iterator. You have to use the
|
||||
// 'Next' method to move to the next entry before calling
|
||||
// 'Get' to retreive the following instance.
|
||||
func (iterator IBioSequence) Get() *obiseq.BioSequence {
|
||||
return iterator.pointer.current
|
||||
}
|
||||
|
||||
// Finished returns 'true' value if no more data is available
|
||||
// from the iterator.
|
||||
func (iterator IBioSequence) Finished() bool {
|
||||
return *iterator.pointer.pFinished
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) BufferSize() int {
|
||||
return iterator.pointer.buffer_size
|
||||
}
|
||||
|
||||
// The IBioSequenceBatch converts a IBioSequence iterator
|
||||
// into an iterator oveer batches oof sequences. By default
|
||||
// the size of a batch is of 100 sequences and the iterator
|
||||
// implements a buffer equal to that of the source iterator.
|
||||
// These defaults can be overriden by specifying one or two
|
||||
// optional parametters at the method call. The first one
|
||||
// indicates the batch size. The second optional parametter
|
||||
// indicates the size of the buffer.
|
||||
func (iterator IBioSequence) IBioSequenceBatch(sizes ...int) IBioSequenceBatch {
|
||||
batchsize := 100
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
batchsize = sizes[0]
|
||||
}
|
||||
if len(sizes) > 1 {
|
||||
buffsize = sizes[1]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequenceBatch(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.WaitAndClose()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for j := 0; !iterator.Finished(); j++ {
|
||||
batch := BioSequenceBatch{
|
||||
slice: obiseq.MakeBioSequenceSlice(),
|
||||
order: j}
|
||||
for i := 0; i < batchsize && iterator.Next(); i++ {
|
||||
seq := iterator.Get()
|
||||
batch.slice = append(batch.slice, seq)
|
||||
}
|
||||
newIter.pointer.channel <- batch
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) IBioSequence(sizes ...int) IBioSequence {
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequence(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.pointer.channel)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for iterator.Next() {
|
||||
s := iterator.Get()
|
||||
newIter.pointer.channel <- s
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) Skip(n int, sizes ...int) IBioSequence {
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequence(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.pointer.channel)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for i := 0; iterator.Next(); i++ {
|
||||
if i >= n {
|
||||
s := iterator.Get()
|
||||
newIter.pointer.channel <- s
|
||||
}
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) Head(n int, sizes ...int) IBioSequence {
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequence(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.pointer.channel)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
not_done := true
|
||||
for i := 0; iterator.Next(); i++ {
|
||||
if i < n {
|
||||
s := iterator.Get()
|
||||
newIter.pointer.channel <- s
|
||||
} else {
|
||||
if not_done {
|
||||
newIter.Done()
|
||||
not_done = false
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
// The 'Tail' method discard every data from the source iterator
|
||||
// except the 'n' last ones.
|
||||
func (iterator IBioSequence) Tail(n int, sizes ...int) IBioSequence {
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequence(buffsize)
|
||||
buffseq := obiseq.MakeBioSequenceSlice()
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.pointer.channel)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
var i int
|
||||
for i = 0; iterator.Next(); i++ {
|
||||
buffseq[i%n] = iterator.Get()
|
||||
}
|
||||
if i > n {
|
||||
for j := 0; j < n; j++ {
|
||||
newIter.Channel() <- buffseq[(i+j)%n]
|
||||
}
|
||||
|
||||
} else {
|
||||
for j := 0; j < i; j++ {
|
||||
newIter.Channel() <- buffseq[j]
|
||||
}
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) Concat(iterators ...IBioSequence) IBioSequence {
|
||||
|
||||
if len(iterators) == 0 {
|
||||
return iterator
|
||||
}
|
||||
|
||||
buffsize := iterator.BufferSize()
|
||||
newIter := MakeIBioSequence(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.pointer.channel)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for iterator.Next() {
|
||||
s := iterator.Get()
|
||||
newIter.pointer.channel <- s
|
||||
}
|
||||
|
||||
for _, iter := range iterators {
|
||||
for iter.Next() {
|
||||
s := iter.Get()
|
||||
newIter.pointer.channel <- s
|
||||
}
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
50
pkg/obiiter/merge.go
Normal file
50
pkg/obiiter/merge.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package obiiter
|
||||
|
||||
import "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
|
||||
|
||||
func (iterator IBioSequenceBatch) IMergeSequenceBatch(na string, statsOn []string, sizes ...int) IBioSequenceBatch {
|
||||
batchsize := 100
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
batchsize = sizes[0]
|
||||
}
|
||||
if len(sizes) > 1 {
|
||||
buffsize = sizes[1]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequenceBatch(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.WaitAndClose()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for j := 0; !iterator.Finished(); j++ {
|
||||
batch := BioSequenceBatch{
|
||||
slice: obiseq.MakeBioSequenceSlice(),
|
||||
order: j}
|
||||
for i := 0; i < batchsize && iterator.Next(); i++ {
|
||||
seqs := iterator.Get()
|
||||
batch.slice = append(batch.slice, seqs.slice.Merge(na, statsOn))
|
||||
}
|
||||
if batch.Length() > 0 {
|
||||
newIter.Push(batch)
|
||||
}
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
|
||||
func MergePipe(na string, statsOn []string, sizes ...int) Pipeable {
|
||||
f := func(iterator IBioSequenceBatch) IBioSequenceBatch {
|
||||
return iterator.IMergeSequenceBatch(na,statsOn,sizes...)
|
||||
}
|
||||
|
||||
return f
|
||||
}
|
||||
221
pkg/obiiter/pairedbatchiterator.go
Normal file
221
pkg/obiiter/pairedbatchiterator.go
Normal file
@@ -0,0 +1,221 @@
|
||||
package obiiter
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
|
||||
)
|
||||
|
||||
type PairedBioSequenceBatch struct {
|
||||
forward obiseq.BioSequenceSlice
|
||||
reverse obiseq.BioSequenceSlice
|
||||
order int
|
||||
}
|
||||
|
||||
var NilPairedBioSequenceBatch = PairedBioSequenceBatch{nil, nil, -1}
|
||||
|
||||
func MakePairedBioSequenceBatch(forward, reverse BioSequenceBatch) PairedBioSequenceBatch {
|
||||
if forward.order != reverse.order {
|
||||
log.Fatalf("Forward order : %d and reverse order : %d are not matching",
|
||||
forward.order, reverse.order)
|
||||
}
|
||||
|
||||
for i := range reverse.slice {
|
||||
reverse.slice[i].ReverseComplement(true)
|
||||
}
|
||||
|
||||
return PairedBioSequenceBatch{
|
||||
forward: forward.slice,
|
||||
reverse: reverse.slice,
|
||||
order: forward.order,
|
||||
}
|
||||
}
|
||||
|
||||
func (batch PairedBioSequenceBatch) Order() int {
|
||||
return batch.order
|
||||
}
|
||||
|
||||
func (batch PairedBioSequenceBatch) Reorder(newOrder int) PairedBioSequenceBatch {
|
||||
batch.order = newOrder
|
||||
return batch
|
||||
}
|
||||
|
||||
|
||||
func (batch PairedBioSequenceBatch) Length() int {
|
||||
return len(batch.forward)
|
||||
}
|
||||
|
||||
func (batch PairedBioSequenceBatch) Forward() obiseq.BioSequenceSlice {
|
||||
return batch.forward
|
||||
}
|
||||
|
||||
func (batch PairedBioSequenceBatch) Reverse() obiseq.BioSequenceSlice {
|
||||
return batch.reverse
|
||||
}
|
||||
|
||||
func (batch PairedBioSequenceBatch) IsNil() bool {
|
||||
return batch.forward == nil
|
||||
}
|
||||
|
||||
// Structure implementing an iterator over bioseq.BioSequenceBatch
|
||||
// based on a channel.
|
||||
type __ipairedbiosequencebatch__ struct {
|
||||
channel chan PairedBioSequenceBatch
|
||||
current PairedBioSequenceBatch
|
||||
pushBack bool
|
||||
all_done *sync.WaitGroup
|
||||
buffer_size int
|
||||
finished bool
|
||||
p_finished *bool
|
||||
}
|
||||
|
||||
type IPairedBioSequenceBatch struct {
|
||||
pointer *__ipairedbiosequencebatch__
|
||||
}
|
||||
|
||||
var NilIPairedBioSequenceBatch = IPairedBioSequenceBatch{pointer: nil}
|
||||
|
||||
func MakeIPairedBioSequenceBatch(sizes ...int) IPairedBioSequenceBatch {
|
||||
buffsize := 1
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
i := __ipairedbiosequencebatch__{
|
||||
channel: make(chan PairedBioSequenceBatch, buffsize),
|
||||
current: NilPairedBioSequenceBatch,
|
||||
pushBack: false,
|
||||
buffer_size: buffsize,
|
||||
finished: false,
|
||||
p_finished: nil,
|
||||
}
|
||||
|
||||
i.p_finished = &i.finished
|
||||
waiting := sync.WaitGroup{}
|
||||
i.all_done = &waiting
|
||||
ii := IPairedBioSequenceBatch{&i}
|
||||
return ii
|
||||
}
|
||||
|
||||
func (iterator IPairedBioSequenceBatch) Add(n int) {
|
||||
iterator.pointer.all_done.Add(n)
|
||||
}
|
||||
|
||||
func (iterator IPairedBioSequenceBatch) Done() {
|
||||
iterator.pointer.all_done.Done()
|
||||
}
|
||||
|
||||
func (iterator IPairedBioSequenceBatch) Wait() {
|
||||
iterator.pointer.all_done.Wait()
|
||||
}
|
||||
|
||||
func (iterator IPairedBioSequenceBatch) Channel() chan PairedBioSequenceBatch {
|
||||
return iterator.pointer.channel
|
||||
}
|
||||
|
||||
func (iterator IPairedBioSequenceBatch) IsNil() bool {
|
||||
return iterator.pointer == nil
|
||||
}
|
||||
|
||||
func (iterator IPairedBioSequenceBatch) BufferSize() int {
|
||||
return iterator.pointer.buffer_size
|
||||
}
|
||||
|
||||
func (iterator IPairedBioSequenceBatch) Split() IPairedBioSequenceBatch {
|
||||
i := __ipairedbiosequencebatch__{
|
||||
channel: iterator.pointer.channel,
|
||||
current: NilPairedBioSequenceBatch,
|
||||
pushBack: false,
|
||||
all_done: iterator.pointer.all_done,
|
||||
buffer_size: iterator.pointer.buffer_size,
|
||||
finished: false,
|
||||
p_finished: iterator.pointer.p_finished}
|
||||
newIter := IPairedBioSequenceBatch{&i}
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IPairedBioSequenceBatch) Next() bool {
|
||||
if *(iterator.pointer.p_finished) {
|
||||
return false
|
||||
}
|
||||
|
||||
if iterator.pointer.pushBack {
|
||||
iterator.pointer.pushBack = false
|
||||
return true
|
||||
}
|
||||
|
||||
next, ok := (<-iterator.pointer.channel)
|
||||
|
||||
if ok {
|
||||
iterator.pointer.current = next
|
||||
return true
|
||||
}
|
||||
|
||||
iterator.pointer.current = NilPairedBioSequenceBatch
|
||||
*iterator.pointer.p_finished = true
|
||||
return false
|
||||
}
|
||||
|
||||
func (iterator IPairedBioSequenceBatch) PushBack() {
|
||||
if !iterator.pointer.current.IsNil() {
|
||||
iterator.pointer.pushBack = true
|
||||
}
|
||||
}
|
||||
|
||||
// The 'Get' method returns the instance of BioSequenceBatch
|
||||
// currently pointed by the iterator. You have to use the
|
||||
// 'Next' method to move to the next entry before calling
|
||||
// 'Get' to retreive the following instance.
|
||||
func (iterator IPairedBioSequenceBatch) Get() PairedBioSequenceBatch {
|
||||
return iterator.pointer.current
|
||||
}
|
||||
|
||||
// Finished returns 'true' value if no more data is available
|
||||
// from the iterator.
|
||||
func (iterator IPairedBioSequenceBatch) Finished() bool {
|
||||
return *iterator.pointer.p_finished
|
||||
}
|
||||
|
||||
func (iterator IPairedBioSequenceBatch) SortBatches(sizes ...int) IPairedBioSequenceBatch {
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
newIter := MakeIPairedBioSequenceBatch(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.pointer.channel)
|
||||
}()
|
||||
|
||||
next_to_send := 0
|
||||
received := make(map[int]PairedBioSequenceBatch)
|
||||
go func() {
|
||||
for iterator.Next() {
|
||||
batch := iterator.Get()
|
||||
if batch.order == next_to_send {
|
||||
newIter.pointer.channel <- batch
|
||||
next_to_send++
|
||||
batch, ok := received[next_to_send]
|
||||
for ok {
|
||||
newIter.pointer.channel <- batch
|
||||
delete(received, next_to_send)
|
||||
next_to_send++
|
||||
batch, ok = received[next_to_send]
|
||||
}
|
||||
} else {
|
||||
received[batch.order] = batch
|
||||
}
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
|
||||
}
|
||||
46
pkg/obiiter/pipe.go
Normal file
46
pkg/obiiter/pipe.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package obiiter
|
||||
|
||||
|
||||
type Pipeable func(input IBioSequenceBatch) IBioSequenceBatch
|
||||
|
||||
func Pipeline(start Pipeable,parts ...Pipeable) Pipeable {
|
||||
p := func (input IBioSequenceBatch) IBioSequenceBatch {
|
||||
data := start(input)
|
||||
for _,part := range parts {
|
||||
data = part(data)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (input IBioSequenceBatch) Pipe(start Pipeable, parts ...Pipeable) IBioSequenceBatch {
|
||||
p := Pipeline(start,parts...)
|
||||
return p(input)
|
||||
}
|
||||
|
||||
|
||||
type Teeable func(input IBioSequenceBatch) (IBioSequenceBatch,IBioSequenceBatch)
|
||||
|
||||
func (input IBioSequenceBatch) CopyTee() (IBioSequenceBatch,IBioSequenceBatch) {
|
||||
first := MakeIBioSequenceBatch()
|
||||
second:= MakeIBioSequenceBatch()
|
||||
|
||||
first.Add(1)
|
||||
|
||||
go func() {
|
||||
first.WaitAndClose()
|
||||
second.Close()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for input.Next() {
|
||||
b:=input.Get()
|
||||
first.Push(b)
|
||||
second.Push(b)
|
||||
}
|
||||
}()
|
||||
|
||||
return first,second
|
||||
}
|
||||
48
pkg/obiiter/speed.go
Normal file
48
pkg/obiiter/speed.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package obiiter
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/schollz/progressbar/v3"
|
||||
)
|
||||
|
||||
func (iterator IBioSequenceBatch) Speed() IBioSequenceBatch {
|
||||
newIter := MakeIBioSequenceBatch()
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.WaitAndClose()
|
||||
}()
|
||||
|
||||
bar := progressbar.NewOptions(
|
||||
-1,
|
||||
progressbar.OptionSetWriter(os.Stderr),
|
||||
progressbar.OptionSetWidth(15),
|
||||
progressbar.OptionShowCount(),
|
||||
progressbar.OptionShowIts(),
|
||||
progressbar.OptionSetDescription("[Sequence Processing]"))
|
||||
|
||||
go func() {
|
||||
|
||||
for iterator.Next() {
|
||||
batch := iterator.Get()
|
||||
l := batch.Length()
|
||||
newIter.Push(batch)
|
||||
bar.Add(l)
|
||||
}
|
||||
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
|
||||
func SpeedPipe() Pipeable {
|
||||
f := func(iterator IBioSequenceBatch) IBioSequenceBatch {
|
||||
return iterator.Speed()
|
||||
}
|
||||
|
||||
return f
|
||||
}
|
||||
148
pkg/obiiter/workers.go
Normal file
148
pkg/obiiter/workers.go
Normal file
@@ -0,0 +1,148 @@
|
||||
package obiiter
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
|
||||
)
|
||||
|
||||
type SeqAnnotator func(*obiseq.BioSequence)
|
||||
|
||||
type SeqWorker func(*obiseq.BioSequence) *obiseq.BioSequence
|
||||
type SeqSliceWorker func(obiseq.BioSequenceSlice) obiseq.BioSequenceSlice
|
||||
|
||||
func AnnotatorToSeqWorker(function SeqAnnotator) SeqWorker {
|
||||
f := func(seq *obiseq.BioSequence) *obiseq.BioSequence {
|
||||
function(seq)
|
||||
return seq
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
|
||||
|
||||
func (iterator IBioSequenceBatch) MakeIWorker(worker SeqWorker, sizes ...int) IBioSequenceBatch {
|
||||
nworkers := 4
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
nworkers = sizes[0]
|
||||
}
|
||||
|
||||
if len(sizes) > 1 {
|
||||
buffsize = sizes[1]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequenceBatch(buffsize)
|
||||
|
||||
newIter.Add(nworkers)
|
||||
|
||||
go func() {
|
||||
newIter.WaitAndClose()
|
||||
log.Println("End of the batch workers")
|
||||
|
||||
}()
|
||||
|
||||
f := func(iterator IBioSequenceBatch) {
|
||||
for iterator.Next() {
|
||||
batch := iterator.Get()
|
||||
for i, seq := range batch.slice {
|
||||
batch.slice[i] = worker(seq)
|
||||
}
|
||||
newIter.Push(batch)
|
||||
}
|
||||
newIter.Done()
|
||||
}
|
||||
|
||||
log.Println("Start of the batch workers")
|
||||
for i := 0; i < nworkers-1; i++ {
|
||||
go f(iterator.Split())
|
||||
}
|
||||
go f(iterator)
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequenceBatch) MakeISliceWorker(worker SeqSliceWorker, sizes ...int) IBioSequenceBatch {
|
||||
nworkers := 4
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
nworkers = sizes[0]
|
||||
}
|
||||
|
||||
if len(sizes) > 1 {
|
||||
buffsize = sizes[1]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequenceBatch(buffsize)
|
||||
|
||||
newIter.Add(nworkers)
|
||||
|
||||
go func() {
|
||||
newIter.WaitAndClose()
|
||||
log.Println("End of the batch slice workers")
|
||||
}()
|
||||
|
||||
f := func(iterator IBioSequenceBatch) {
|
||||
for iterator.Next() {
|
||||
batch := iterator.Get()
|
||||
batch.slice = worker(batch.slice)
|
||||
newIter.pointer.channel <- batch
|
||||
}
|
||||
newIter.Done()
|
||||
}
|
||||
|
||||
log.Println("Start of the batch slice workers")
|
||||
for i := 0; i < nworkers-1; i++ {
|
||||
go f(iterator.Split())
|
||||
}
|
||||
go f(iterator)
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
|
||||
func (iterator IBioSequence) MakeIWorker(worker SeqWorker, sizes ...int) IBioSequence {
|
||||
buffsize := iterator.BufferSize()
|
||||
|
||||
if len(sizes) > 0 {
|
||||
buffsize = sizes[0]
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequence(buffsize)
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.Wait()
|
||||
close(newIter.pointer.channel)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for iterator.Next() {
|
||||
seq := iterator.Get()
|
||||
seq = worker(seq)
|
||||
newIter.pointer.channel <- seq
|
||||
}
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
func WorkerPipe(worker SeqWorker, sizes ...int) Pipeable {
|
||||
f := func(iterator IBioSequenceBatch) IBioSequenceBatch {
|
||||
return iterator.MakeIWorker(worker,sizes...)
|
||||
}
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
func SliceWorkerPipe(worker SeqSliceWorker, sizes ...int) Pipeable {
|
||||
f := func(iterator IBioSequenceBatch) IBioSequenceBatch {
|
||||
return iterator.MakeISliceWorker(worker,sizes...)
|
||||
}
|
||||
|
||||
return f
|
||||
}
|
||||
Reference in New Issue
Block a user