Files
obitools4/pkg/obichunk/unique.go

116 lines
2.1 KiB
Go
Raw Normal View History

2022-02-18 09:58:08 +01:00
package obichunk
import (
"sync"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
2022-02-18 09:58:08 +01:00
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
func IUniqueSequence(iterator obiiter.IBioSequenceBatch,
options ...WithOption) (obiiter.IBioSequenceBatch, error) {
2022-02-18 09:58:08 +01:00
var err error
opts := MakeOptions(options)
nworkers := opts.ParallelWorkers()
2022-02-18 09:58:08 +01:00
iUnique := obiiter.MakeIBioSequenceBatch(opts.BufferSize())
2022-02-18 09:58:08 +01:00
if opts.SortOnDisk() {
nworkers = 1
2022-02-18 09:58:08 +01:00
iterator, err = ISequenceChunkOnDisk(iterator,
obiseq.HashClassifier(opts.BatchCount()),
0)
2022-02-18 09:58:08 +01:00
if err != nil {
return obiiter.NilIBioSequenceBatch, err
2022-02-18 09:58:08 +01:00
}
} else {
iterator, err = ISequenceChunk(iterator,
obiseq.HashClassifier(opts.BatchCount()),
opts.BufferSize())
if err != nil {
return obiiter.NilIBioSequenceBatch, err
2022-02-18 09:58:08 +01:00
}
}
iUnique.Add(nworkers)
go func() {
iUnique.Wait()
iUnique.Close()
2022-02-18 09:58:08 +01:00
}()
omutex := sync.Mutex{}
order := 0
nextOrder := func() int {
omutex.Lock()
neworder := order
order++
omutex.Unlock()
return neworder
}
var ff func(obiiter.IBioSequenceBatch, *obiseq.BioSequenceClassifier, int)
2022-02-18 09:58:08 +01:00
cat := opts.Categories()
na := opts.NAValue()
ff = func(input obiiter.IBioSequenceBatch,
2022-02-18 22:53:09 +01:00
classifier *obiseq.BioSequenceClassifier,
2022-02-18 09:58:08 +01:00
icat int) {
icat--
input, err = ISequenceSubChunk(input,
classifier,
2022-02-18 22:53:09 +01:00
1,
2022-02-18 09:58:08 +01:00
opts.BufferSize())
var next obiiter.IBioSequenceBatch
2022-02-18 09:58:08 +01:00
if icat >= 0 {
next = obiiter.MakeIBioSequenceBatch(opts.BufferSize())
2022-02-18 09:58:08 +01:00
iUnique.Add(1)
go ff(next,
obiseq.AnnotationClassifier(cat[icat], na),
icat)
}
o := 0
for input.Next() {
batch := input.Get()
2022-02-18 09:58:08 +01:00
if icat < 0 || len(batch.Slice()) == 1 {
iUnique.Push(batch.Reorder(nextOrder()))
2022-02-18 09:58:08 +01:00
} else {
next.Push(batch.Reorder(o))
2022-02-18 09:58:08 +01:00
o++
}
}
if icat >= 0 {
next.Close()
2022-02-18 09:58:08 +01:00
}
iUnique.Done()
}
for i := 0; i < nworkers-1; i++ {
go ff(iterator.Split(),
obiseq.SequenceClassifier(),
len(cat))
}
go ff(iterator,
obiseq.SequenceClassifier(),
len(cat))
iMerged := iUnique.IMergeSequenceBatch(opts.NAValue(),
opts.StatsOn(),
2022-02-18 09:58:08 +01:00
opts.BufferSize(),
)
return iMerged.Speed(), nil
2022-02-18 09:58:08 +01:00
}