Files
obitools4/pkg/obichunk/unique.go
Eric Coissac b49aba9c09 Implémentation du filtrage unique basé sur séquence et catégories
Ajout d'une fonctionnalité pour le filtrage unique qui prend en compte à la fois la séquence et les catégories.

- Modification de la fonction ISequenceChunk pour accepter un classifieur unique optionnel
- Implémentation du traitement unique sur disque en utilisant un classifieur composite
- Mise à jour du classifieur utilisé pour le tri sur disque
- Correction de la gestion des clés de unicité en utilisant le code et la valeur du classifieur
- Mise à jour du numéro de commit
2026-01-14 19:18:17 +01:00

109 lines
2.4 KiB
Go

package obichunk
import (
"sync"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
)
// Runs dereplication algorithm on a obiiter.IBioSequenceBatch
// iterator.
func IUniqueSequence(iterator obiiter.IBioSequence,
options ...WithOption) (obiiter.IBioSequence, error) {
var err error
opts := MakeOptions(options)
nworkers := opts.ParallelWorkers()
iUnique := obiiter.MakeIBioSequence()
iterator = iterator.Speed("Splitting data set")
log.Infoln("Starting data splitting")
cat := opts.Categories()
na := opts.NAValue()
// Classifier for bucketing: Hash only to control number of chunks
bucketClassifier := obiseq.HashClassifier(opts.BatchCount())
// Classifier for uniqueness: Sequence + categories
var uniqueClassifier *obiseq.BioSequenceClassifier
if len(cat) > 0 {
cls := make([]*obiseq.BioSequenceClassifier, len(cat)+1)
cls[0] = obiseq.SequenceClassifier()
for i, c := range cat {
cls[i+1] = obiseq.AnnotationClassifier(c, na)
}
uniqueClassifier = obiseq.CompositeClassifier(cls...)
} else {
uniqueClassifier = obiseq.SequenceClassifier()
}
if opts.SortOnDisk() {
nworkers = 1
iterator, err = ISequenceChunkOnDisk(iterator, bucketClassifier, true, na, opts.StatsOn(), uniqueClassifier)
if err != nil {
return obiiter.NilIBioSequence, err
}
} else {
iterator, err = ISequenceChunkOnMemory(iterator, bucketClassifier)
if err != nil {
return obiiter.NilIBioSequence, err
}
}
log.Infoln("End of the data splitting")
iUnique.Add(nworkers)
go func() {
iUnique.Wait()
iUnique.Close()
}()
omutex := sync.Mutex{}
order := 0
nextOrder := func() int {
omutex.Lock()
neworder := order
order++
omutex.Unlock()
return neworder
}
ff := func(input obiiter.IBioSequence,
classifier *obiseq.BioSequenceClassifier) {
input, err = ISequenceSubChunk(input,
classifier,
1)
for input.Next() {
batch := input.Get()
if !(opts.NoSingleton() && len(batch.Slice()) == 1 && batch.Slice()[0].Count() == 1) {
iUnique.Push(batch.Reorder(nextOrder()))
}
}
iUnique.Done()
}
for i := 0; i < nworkers-1; i++ {
go ff(iterator.Split(), uniqueClassifier.Clone())
}
go ff(iterator, uniqueClassifier)
iMerged := iUnique.IMergeSequenceBatch(opts.NAValue(),
opts.StatsOn(),
)
return iMerged, nil
}