From f00456fcf35914f5880e40957b54127762d72ee0 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sun, 21 Aug 2022 13:41:15 +0200 Subject: [PATCH] Add a second way to merge several batch iterators using the pool method. --- pkg/obiiter/batchiterator.go | 58 +++++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index 4c735c1..ae15ebe 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -34,7 +34,6 @@ type IBioSequenceBatch struct { // // NilIBioSequenceBatch is the nil instance for the // IBioSequenceBatch type. -// var NilIBioSequenceBatch = IBioSequenceBatch{pointer: nil} func MakeIBioSequenceBatch(sizes ...int) IBioSequenceBatch { @@ -329,8 +328,7 @@ func (iterator IBioSequenceBatch) Concat(iterators ...IBioSequenceBatch) IBioSeq newIter.Add(1) go func() { - newIter.Wait() - close(newIter.Channel()) + newIter.WaitAndClose() }() go func() { @@ -363,6 +361,52 @@ func (iterator IBioSequenceBatch) Concat(iterators ...IBioSequenceBatch) IBioSeq return newIter } +func (iterator IBioSequenceBatch) Pool(iterators ...IBioSequenceBatch) IBioSequenceBatch { + + niterator := len(iterators) + 1 + + if niterator == 1 { + return iterator + } + + counterMutex := sync.Mutex{} + counter := 0 + + nextCounter := func() int { + counterMutex.Lock() + defer counterMutex.Unlock() + + counter++ + + return counter + } + + buffsize := iterator.BufferSize() + newIter := MakeIBioSequenceBatch(buffsize) + + newIter.Add(niterator) + + go func() { + newIter.WaitAndClose() + }() + + ff := func(iterator IBioSequenceBatch) { + + for iterator.Next() { + s := iterator.Get() + newIter.Push(s.Reorder(nextCounter())) + } + newIter.Done() + } + + go ff(iterator) + for _, i := range iterators { + go ff(i) + } + + return newIter +} + // Redistributes sequences from a IBioSequenceBatch into a new // IBioSequenceBatch with every batches having the same size // indicated in parameter. Rebatching implies to sort the @@ -620,7 +664,7 @@ func (iterator IBioSequenceBatch) FilterOn(predicate obiseq.SequencePredicate, return trueIter.Rebatch(size) } -// Load every sequences availables from an IBioSequenceBatch iterator into +// Load every sequences availables from an IBioSequenceBatch iterator into // a large obiseq.BioSequenceSlice. func (iterator IBioSequenceBatch) Load() obiseq.BioSequenceSlice { @@ -657,12 +701,12 @@ func IBatchOver(data obiseq.BioSequenceSlice, ldata := len(data) batchid := 0 next := 0 - for i:=0; i < ldata; i=next { + for i := 0; i < ldata; i = next { next = i + size if next > ldata { next = ldata - } - trueIter.Push(MakeBioSequenceBatch(batchid,data[i:next])) + } + trueIter.Push(MakeBioSequenceBatch(batchid, data[i:next])) batchid++ }