2022-02-18 09:58:08 +01:00
|
|
|
package obichunk
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"sync"
|
|
|
|
|
|
2022-02-24 12:14:52 +01:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
|
|
2023-11-29 12:14:37 +01:00
|
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
|
|
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
2022-02-18 09:58:08 +01:00
|
|
|
)
|
|
|
|
|
|
2022-05-30 16:28:59 +02:00
|
|
|
// Runs dereplication algorithm on a obiiter.IBioSequenceBatch
|
2022-08-21 14:47:22 +02:00
|
|
|
// iterator.
|
2022-05-30 16:28:59 +02:00
|
|
|
|
2023-01-22 22:04:17 +01:00
|
|
|
func IUniqueSequence(iterator obiiter.IBioSequence,
|
|
|
|
|
options ...WithOption) (obiiter.IBioSequence, error) {
|
2022-02-18 09:58:08 +01:00
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
opts := MakeOptions(options)
|
2022-02-21 19:00:23 +01:00
|
|
|
nworkers := opts.ParallelWorkers()
|
2022-02-18 09:58:08 +01:00
|
|
|
|
2023-03-07 11:12:13 +07:00
|
|
|
iUnique := obiiter.MakeIBioSequence()
|
2022-02-18 09:58:08 +01:00
|
|
|
|
2022-02-24 12:14:52 +01:00
|
|
|
iterator = iterator.Speed("Splitting data set")
|
|
|
|
|
|
|
|
|
|
log.Infoln("Starting data splitting")
|
|
|
|
|
|
2025-12-03 11:48:50 +01:00
|
|
|
cat := opts.Categories()
|
|
|
|
|
na := opts.NAValue()
|
|
|
|
|
|
2026-01-14 19:18:08 +01:00
|
|
|
// Classifier for bucketing: Hash only to control number of chunks
|
|
|
|
|
bucketClassifier := obiseq.HashClassifier(opts.BatchCount())
|
2025-12-03 11:48:50 +01:00
|
|
|
|
2026-01-14 19:18:08 +01:00
|
|
|
// Classifier for uniqueness: Sequence + categories
|
|
|
|
|
var uniqueClassifier *obiseq.BioSequenceClassifier
|
2025-12-03 11:48:50 +01:00
|
|
|
if len(cat) > 0 {
|
|
|
|
|
cls := make([]*obiseq.BioSequenceClassifier, len(cat)+1)
|
2026-01-14 19:18:08 +01:00
|
|
|
cls[0] = obiseq.SequenceClassifier()
|
2025-12-03 11:48:50 +01:00
|
|
|
for i, c := range cat {
|
|
|
|
|
cls[i+1] = obiseq.AnnotationClassifier(c, na)
|
|
|
|
|
}
|
2026-01-14 19:18:08 +01:00
|
|
|
uniqueClassifier = obiseq.CompositeClassifier(cls...)
|
2025-12-03 11:48:50 +01:00
|
|
|
} else {
|
2026-01-14 19:18:08 +01:00
|
|
|
uniqueClassifier = obiseq.SequenceClassifier()
|
2025-12-03 11:48:50 +01:00
|
|
|
}
|
|
|
|
|
|
2022-02-18 09:58:08 +01:00
|
|
|
if opts.SortOnDisk() {
|
2022-02-21 19:00:23 +01:00
|
|
|
nworkers = 1
|
2026-01-14 19:18:08 +01:00
|
|
|
iterator, err = ISequenceChunkOnDisk(iterator, bucketClassifier, true, na, opts.StatsOn(), uniqueClassifier)
|
2022-02-18 09:58:08 +01:00
|
|
|
|
|
|
|
|
if err != nil {
|
2023-01-22 22:04:17 +01:00
|
|
|
return obiiter.NilIBioSequence, err
|
2022-02-18 09:58:08 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else {
|
2026-01-14 19:18:08 +01:00
|
|
|
iterator, err = ISequenceChunkOnMemory(iterator, bucketClassifier)
|
2022-02-18 09:58:08 +01:00
|
|
|
|
|
|
|
|
if err != nil {
|
2023-01-22 22:04:17 +01:00
|
|
|
return obiiter.NilIBioSequence, err
|
2022-02-18 09:58:08 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-24 12:14:52 +01:00
|
|
|
log.Infoln("End of the data splitting")
|
|
|
|
|
|
2022-02-18 09:58:08 +01:00
|
|
|
iUnique.Add(nworkers)
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
iUnique.Wait()
|
2022-02-21 19:00:23 +01:00
|
|
|
iUnique.Close()
|
2022-02-18 09:58:08 +01:00
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
omutex := sync.Mutex{}
|
|
|
|
|
order := 0
|
|
|
|
|
|
|
|
|
|
nextOrder := func() int {
|
|
|
|
|
omutex.Lock()
|
|
|
|
|
neworder := order
|
|
|
|
|
order++
|
|
|
|
|
omutex.Unlock()
|
|
|
|
|
return neworder
|
|
|
|
|
}
|
|
|
|
|
|
2025-12-03 11:48:50 +01:00
|
|
|
ff := func(input obiiter.IBioSequence,
|
|
|
|
|
classifier *obiseq.BioSequenceClassifier) {
|
2022-02-18 09:58:08 +01:00
|
|
|
input, err = ISequenceSubChunk(input,
|
|
|
|
|
classifier,
|
2023-03-07 11:12:13 +07:00
|
|
|
1)
|
2022-02-18 09:58:08 +01:00
|
|
|
|
|
|
|
|
for input.Next() {
|
|
|
|
|
batch := input.Get()
|
2025-12-03 11:48:50 +01:00
|
|
|
if !(opts.NoSingleton() && len(batch.Slice()) == 1 && batch.Slice()[0].Count() == 1) {
|
|
|
|
|
iUnique.Push(batch.Reorder(nextOrder()))
|
2022-02-18 09:58:08 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
iUnique.Done()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i := 0; i < nworkers-1; i++ {
|
2026-01-14 19:18:08 +01:00
|
|
|
go ff(iterator.Split(), uniqueClassifier.Clone())
|
2022-02-18 09:58:08 +01:00
|
|
|
}
|
2026-01-14 19:18:08 +01:00
|
|
|
go ff(iterator, uniqueClassifier)
|
2022-02-18 09:58:08 +01:00
|
|
|
|
2022-02-21 19:00:23 +01:00
|
|
|
iMerged := iUnique.IMergeSequenceBatch(opts.NAValue(),
|
|
|
|
|
opts.StatsOn(),
|
2022-02-18 09:58:08 +01:00
|
|
|
)
|
|
|
|
|
|
2023-02-23 23:35:58 +01:00
|
|
|
return iMerged, nil
|
2022-02-18 09:58:08 +01:00
|
|
|
}
|