Big change iin the data model, and a first version of obiuniq

This commit is contained in:
2022-02-21 19:00:23 +01:00
parent 9737f97084
commit 2e7c1834b0
43 changed files with 664 additions and 440 deletions

View File

@ -58,7 +58,7 @@ func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch,
}()
newIter.Wait()
close(newIter.Channel())
newIter.Close()
}()
obiformats.WriterDispatcher(dir+"/chunk_%s.fastx",
@ -78,14 +78,15 @@ func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch,
panic(err)
}
chunck := make(obiseq.BioSequenceSlice, 0, 10000)
//chunck := make(obiseq.BioSequenceSlice, 0, 10000)
chunck := obiseq.MakeBioSequenceSlice()
for iseq.Next() {
b := iseq.Get()
chunck = append(chunck, b.Slice()...)
b.Recycle()
}
newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, chunck...)
newIter.Push(obiseq.MakeBioSequenceBatch(order, chunck))
}

View File

@ -23,7 +23,7 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch,
go func() {
newIter.Wait()
close(newIter.Channel())
newIter.Close()
}()
go func() {
@ -43,7 +43,7 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch,
log.Fatalf("Cannot retreive the new chanel : %v", err)
}
chunk := obiseq.GetBioSequenceSlicePtr()
chunk := obiseq.NewBioSequenceSlice()
lock.Lock()
chunks[newflux] = chunk
lock.Unlock()
@ -64,7 +64,7 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch,
for _, chunck := range chunks {
if len(*chunck) > 0 {
newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, *chunck...)
newIter.Push(obiseq.MakeBioSequenceBatch(order, *chunck))
order++
}

View File

@ -1,11 +1,59 @@
package obichunk
import (
"sync"
"log"
"sort"
"sync/atomic"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
//
// Interface for sorting a list of sequences accoording to
// their classes
//
type sSS struct {
code int
seq *obiseq.BioSequence
}
// By is the type of a "less" function that defines the ordering of its Planet arguments.
type _By func(p1, p2 *sSS) bool
type sSSSorter struct {
seqs []sSS
by _By // Closure used in the Less method.
}
// Len is part of sort.Interface.
func (s *sSSSorter) Len() int {
return len(s.seqs)
}
// Swap is part of sort.Interface.
func (s *sSSSorter) Swap(i, j int) {
s.seqs[i], s.seqs[j] = s.seqs[j], s.seqs[i]
}
// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter.
func (s *sSSSorter) Less(i, j int) bool {
return s.by(&s.seqs[i], &s.seqs[j])
}
// Sort is a method on the function type, By, that sorts the argument slice according to the function.
func (by _By) Sort(seqs []sSS) {
ps := &sSSSorter{
seqs: seqs,
by: by, // The Sort method's receiver is the function (closure) that defines the sort order.
}
sort.Sort(ps)
}
//
// End of the sort interface
//
func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch,
classifier *obiseq.BioSequenceClassifier,
sizes ...int) (obiseq.IBioSequenceBatch, error) {
@ -27,55 +75,76 @@ func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch,
go func() {
newIter.Wait()
close(newIter.Channel())
newIter.Close()
}()
omutex := sync.Mutex{}
order := 0
//omutex := sync.Mutex{}
order := int32(0)
nextOrder := func() int {
omutex.Lock()
neworder := order
order++
omutex.Unlock()
neworder := int(atomic.AddInt32(&order, 1))
return neworder
}
ff := func(iterator obiseq.IBioSequenceBatch) {
chunks := make(map[int]*obiseq.BioSequenceSlice, 100)
ff := func(iterator obiseq.IBioSequenceBatch,
classifier *obiseq.BioSequenceClassifier) {
ordered := make([]sSS, 100)
for iterator.Next() {
batch := iterator.Get()
for _, s := range batch.Slice() {
key := classifier.Code(s)
if batch.Length() > 1 {
classifier.Reset()
slice, ok := chunks[key]
if !ok {
slice = obiseq.GetBioSequenceSlicePtr()
chunks[key] = slice
if cap(ordered) < batch.Length() {
log.Println("Allocate a new ordered sequences : ", batch.Length())
ordered = make([]sSS, batch.Length())
} else {
ordered = ordered[:batch.Length()]
}
*slice = append(*slice, s)
}
for i, s := range batch.Slice() {
ordered[i].code = classifier.Code(s)
ordered[i].seq = s
batch.Slice()[i] = nil
}
for k, chunck := range chunks {
newIter.Channel() <- obiseq.MakeBioSequenceBatch(nextOrder(), *chunck...)
delete(chunks, k)
}
batch.Recycle()
batch.Recycle()
_By(func(p1, p2 *sSS) bool {
return p1.code < p2.code
}).Sort(ordered)
last := ordered[0].code
ss := obiseq.MakeBioSequenceSlice()
for i, v := range ordered {
if v.code != last {
newIter.Push(obiseq.MakeBioSequenceBatch(nextOrder(), ss))
ss = obiseq.MakeBioSequenceSlice()
last = v.code
}
ss = append(ss, v.seq)
ordered[i].seq = nil
}
if len(ss) > 0 {
newIter.Push(obiseq.MakeBioSequenceBatch(nextOrder(), ss))
}
} else {
newIter.Push(batch.Reorder(nextOrder()))
}
}
newIter.Done()
}
for i := 0; i < nworkers-1; i++ {
go ff(iterator.Split())
go ff(iterator.Split(), classifier.Clone())
}
go ff(iterator)
go ff(iterator, classifier)
return newIter, nil
}

View File

@ -11,10 +11,12 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch,
var err error
opts := MakeOptions(options)
nworkers := opts.ParallelWorkers()
iUnique := obiseq.MakeIBioSequenceBatch(opts.BufferSize())
if opts.SortOnDisk() {
nworkers = 1
iterator, err = ISequenceChunkOnDisk(iterator,
obiseq.HashClassifier(opts.BatchCount()),
opts.BufferSize())
@ -33,13 +35,11 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch,
}
}
nworkers := opts.ParallelWorkers()
iUnique.Add(nworkers)
go func() {
iUnique.Wait()
close(iUnique.Channel())
iUnique.Close()
}()
omutex := sync.Mutex{}
@ -58,14 +58,6 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch,
cat := opts.Categories()
na := opts.NAValue()
// ff = func(input obiseq.IBioSequenceBatch,
// classifier obiseq.BioSequenceClassifier,
// icat int) {
// log.Println(na, nextOrder)
// input.Recycle()
// iUnique.Done()
// }
ff = func(input obiseq.IBioSequenceBatch,
classifier *obiseq.BioSequenceClassifier,
icat int) {
@ -88,16 +80,17 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch,
o := 0
for input.Next() {
batch := input.Get()
if icat < 0 || len(batch.Slice()) == 1 {
iUnique.Channel() <- batch.Reorder(nextOrder())
iUnique.Push(batch.Reorder(nextOrder()))
} else {
next.Channel() <- batch.Reorder(o)
next.Push(batch.Reorder(o))
o++
}
}
if icat >= 0 {
close(next.Channel())
next.Close()
}
iUnique.Done()
@ -112,12 +105,10 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch,
obiseq.SequenceClassifier(),
len(cat))
iMerged := iUnique.MakeISliceWorker(
obiseq.MergeSliceWorker(
opts.NAValue(),
opts.StatsOn()...),
iMerged := iUnique.IMergeSequenceBatch(opts.NAValue(),
opts.StatsOn(),
opts.BufferSize(),
)
return iMerged.Rebatch(opts.BatchSize()), nil
return iMerged.Speed(), nil
}