From 9737f97084065f43138a0c2883d9ae937e38ed16 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 18 Feb 2022 22:53:09 +0100 Subject: [PATCH] before big changes --- cmd/obitools/obiuniq/main.go | 14 +-- pkg/obichunk/chunk_on_disk.go | 4 +- pkg/obichunk/chunks.go | 17 +-- pkg/obichunk/subchunks.go | 12 +-- pkg/obichunk/unique.go | 14 ++- pkg/obiformats/dispatcher.go | 4 +- pkg/obiformats/fastseq_read.go | 4 +- pkg/obiseq/batchiterator.go | 19 ++-- pkg/obiseq/biosequence.go | 14 +-- pkg/obiseq/class.go | 146 +++++++++++++++++++++----- pkg/obiseq/distribute.go | 27 ++--- pkg/obiseq/iterator.go | 4 +- pkg/obiseq/pool.go | 38 +++++-- pkg/obiseq/speed.go | 6 ++ pkg/obitools/obidistribute/options.go | 2 +- 15 files changed, 234 insertions(+), 91 deletions(-) create mode 100644 pkg/obiseq/speed.go diff --git a/cmd/obitools/obiuniq/main.go b/cmd/obitools/obiuniq/main.go index b63fea5..5b741e6 100644 --- a/cmd/obitools/obiuniq/main.go +++ b/cmd/obitools/obiuniq/main.go @@ -1,7 +1,9 @@ package main import ( + "log" "os" + "runtime/pprof" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" @@ -11,12 +13,12 @@ import ( func main() { // go tool pprof -http=":8000" ./obipairing ./cpu.pprof - // f, err := os.Create("cpu.pprof") - // if err != nil { - // log.Fatal(err) - // } - // pprof.StartCPUProfile(f) - // defer pprof.StopCPUProfile() + f, err := os.Create("cpu.pprof") + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() // go tool trace cpu.trace // ftrace, err := os.Create("cpu.trace") diff --git a/pkg/obichunk/chunk_on_disk.go b/pkg/obichunk/chunk_on_disk.go index b0ed456..1be3580 100644 --- a/pkg/obichunk/chunk_on_disk.go +++ b/pkg/obichunk/chunk_on_disk.go @@ -34,7 +34,7 @@ func find(root, ext string) []string { } func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch, - classifier obiseq.BioSequenceClassifier, + classifier *obiseq.BioSequenceClassifier, sizes ...int) (obiseq.IBioSequenceBatch, error) { dir, err := tempDir() if err != nil { @@ -78,7 +78,7 @@ func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch, panic(err) } - chunck := make(obiseq.BioSequenceSlice, 0, 1000) + chunck := make(obiseq.BioSequenceSlice, 0, 10000) for iseq.Next() { b := iseq.Get() diff --git a/pkg/obichunk/chunks.go b/pkg/obichunk/chunks.go index 50c8a29..677c2f2 100644 --- a/pkg/obichunk/chunks.go +++ b/pkg/obichunk/chunks.go @@ -8,7 +8,7 @@ import ( ) func ISequenceChunk(iterator obiseq.IBioSequenceBatch, - classifier obiseq.BioSequenceClassifier, + classifier *obiseq.BioSequenceClassifier, sizes ...int) (obiseq.IBioSequenceBatch, error) { bufferSize := iterator.BufferSize() @@ -32,27 +32,28 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, dispatcher := iterator.Distribute(classifier) jobDone := sync.WaitGroup{} - chunks := make(map[string]*obiseq.BioSequenceSlice, 100) + chunks := make(map[int]*obiseq.BioSequenceSlice, 1000) for newflux := range dispatcher.News() { jobDone.Add(1) - go func(newflux string) { + go func(newflux int) { data, err := dispatcher.Outputs(newflux) if err != nil { log.Fatalf("Cannot retreive the new chanel : %v", err) } - chunk := make(obiseq.BioSequenceSlice, 0, 1000) + chunk := obiseq.GetBioSequenceSlicePtr() + lock.Lock() + chunks[newflux] = chunk + lock.Unlock() for data.Next() { b := data.Get() - chunk = append(chunk, b.Slice()...) + *chunk = append(*chunk, b.Slice()...) + b.Recycle() } - lock.Lock() - chunks[newflux] = &chunk - lock.Unlock() jobDone.Done() }(newflux) } diff --git a/pkg/obichunk/subchunks.go b/pkg/obichunk/subchunks.go index 7d7a818..2d2ff61 100644 --- a/pkg/obichunk/subchunks.go +++ b/pkg/obichunk/subchunks.go @@ -7,7 +7,7 @@ import ( ) func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch, - classifier obiseq.BioSequenceClassifier, + classifier *obiseq.BioSequenceClassifier, sizes ...int) (obiseq.IBioSequenceBatch, error) { bufferSize := iterator.BufferSize() @@ -42,33 +42,31 @@ func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch, } ff := func(iterator obiseq.IBioSequenceBatch) { - chunks := make(map[string]*obiseq.BioSequenceSlice, 100) + chunks := make(map[int]*obiseq.BioSequenceSlice, 100) for iterator.Next() { batch := iterator.Get() for _, s := range batch.Slice() { - key := classifier(s) + key := classifier.Code(s) slice, ok := chunks[key] if !ok { - is := make(obiseq.BioSequenceSlice, 0, len(batch.Slice())) - slice = &is + slice = obiseq.GetBioSequenceSlicePtr() chunks[key] = slice } *slice = append(*slice, s) } - n := 0 for k, chunck := range chunks { - n += len(*chunck) newIter.Channel() <- obiseq.MakeBioSequenceBatch(nextOrder(), *chunck...) delete(chunks, k) } + batch.Recycle() } newIter.Done() diff --git a/pkg/obichunk/unique.go b/pkg/obichunk/unique.go index 49ce662..55e75e3 100644 --- a/pkg/obichunk/unique.go +++ b/pkg/obichunk/unique.go @@ -34,6 +34,7 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch, } nworkers := opts.ParallelWorkers() + iUnique.Add(nworkers) go func() { @@ -52,17 +53,26 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch, return neworder } - var ff func(obiseq.IBioSequenceBatch, obiseq.BioSequenceClassifier, int) + var ff func(obiseq.IBioSequenceBatch, *obiseq.BioSequenceClassifier, int) cat := opts.Categories() na := opts.NAValue() + // ff = func(input obiseq.IBioSequenceBatch, + // classifier obiseq.BioSequenceClassifier, + // icat int) { + // log.Println(na, nextOrder) + // input.Recycle() + // iUnique.Done() + // } + ff = func(input obiseq.IBioSequenceBatch, - classifier obiseq.BioSequenceClassifier, + classifier *obiseq.BioSequenceClassifier, icat int) { icat-- input, err = ISequenceSubChunk(input, classifier, + 1, opts.BufferSize()) var next obiseq.IBioSequenceBatch diff --git a/pkg/obiformats/dispatcher.go b/pkg/obiformats/dispatcher.go index b069de0..c7f7a66 100644 --- a/pkg/obiformats/dispatcher.go +++ b/pkg/obiformats/dispatcher.go @@ -23,7 +23,7 @@ func WriterDispatcher(prototypename string, go func() { for newflux := range dispatcher.News() { jobDone.Add(1) - go func(newflux string) { + go func(newflux int) { data, err := dispatcher.Outputs(newflux) if err != nil { @@ -35,7 +35,7 @@ func WriterDispatcher(prototypename string, options...) if err != nil { - log.Fatalf("cannot open the output file for key %s", newflux) + log.Fatalf("cannot open the output file for key %d", newflux) } out.Recycle() diff --git a/pkg/obiformats/fastseq_read.go b/pkg/obiformats/fastseq_read.go index 12367ea..92eaf59 100644 --- a/pkg/obiformats/fastseq_read.go +++ b/pkg/obiformats/fastseq_read.go @@ -24,7 +24,7 @@ func _FastseqReader(seqfile C.fast_kseq_p, i := 0 ii := 0 - slice := make(obiseq.BioSequenceSlice, 0, batch_size) + slice := obiseq.GetBioSequenceSlice() for l := int64(C.next_fast_sek(seqfile)); l > 0; l = int64(C.next_fast_sek(seqfile)) { @@ -125,7 +125,7 @@ func ReadFastSeqBatchFromFile(filename string, options ...WithOption) (obiseq.IB if parser != nil { return IParseFastSeqHeaderBatch(newIter, options...), err - } + } return newIter, err } diff --git a/pkg/obiseq/batchiterator.go b/pkg/obiseq/batchiterator.go index ccafc10..ea2c371 100644 --- a/pkg/obiseq/batchiterator.go +++ b/pkg/obiseq/batchiterator.go @@ -43,6 +43,11 @@ func (batch BioSequenceBatch) IsNil() bool { return batch.slice == nil } +func (batch BioSequenceBatch) Recycle() { + batch.slice.Recycle() + batch.slice = nil +} + // Structure implementing an iterator over bioseq.BioSequenceBatch // based on a channel. type _IBioSequenceBatch struct { @@ -343,7 +348,7 @@ func (iterator IBioSequenceBatch) Rebatch(size int, sizes ...int) IBioSequenceBa go func() { order := 0 iterator = iterator.SortBatches() - buffer := make(BioSequenceSlice, 0, size) + buffer := GetBioSequenceSlice() for iterator.Next() { seqs := iterator.Get() @@ -352,9 +357,10 @@ func (iterator IBioSequenceBatch) Rebatch(size int, sizes ...int) IBioSequenceBa if len(buffer) == size { newIter.Channel() <- MakeBioSequenceBatch(order, buffer...) order++ - buffer = make(BioSequenceSlice, 0, size) + buffer = GetBioSequenceSlice() } } + seqs.Recycle() } if len(buffer) > 0 { @@ -449,8 +455,8 @@ func (iterator IBioSequenceBatch) DivideOn(predicate SequencePredicate, falseOrder := 0 iterator = iterator.SortBatches() - trueSlice := make(BioSequenceSlice, 0, size) - falseSlice := make(BioSequenceSlice, 0, size) + trueSlice := GetBioSequenceSlice() + falseSlice := GetBioSequenceSlice() for iterator.Next() { seqs := iterator.Get() @@ -464,15 +470,16 @@ func (iterator IBioSequenceBatch) DivideOn(predicate SequencePredicate, if len(trueSlice) == size { trueIter.Channel() <- MakeBioSequenceBatch(trueOrder, trueSlice...) trueOrder++ - trueSlice = make(BioSequenceSlice, 0, size) + trueSlice = GetBioSequenceSlice() } if len(falseSlice) == size { falseIter.Channel() <- MakeBioSequenceBatch(falseOrder, falseSlice...) falseOrder++ - falseSlice = make(BioSequenceSlice, 0, size) + falseSlice = GetBioSequenceSlice() } } + seqs.Recycle() } if len(trueSlice) > 0 { diff --git a/pkg/obiseq/biosequence.go b/pkg/obiseq/biosequence.go index 42f71b1..a4f78ae 100644 --- a/pkg/obiseq/biosequence.go +++ b/pkg/obiseq/biosequence.go @@ -62,11 +62,11 @@ func (sequence *BioSequence) Recycle() { pseq := sequence.sequence if pseq != nil { - RecycleSlice(pseq.sequence) - RecycleSlice(pseq.feature) - RecycleSlice(pseq.qualities) + RecycleSlice(&pseq.sequence) + RecycleSlice(&pseq.feature) + RecycleSlice(&pseq.qualities) - RecycleAnnotation(pseq.annotations) + RecycleAnnotation(&pseq.annotations) } sequence.sequence = nil @@ -187,21 +187,21 @@ func (s BioSequence) SetDefinition(definition string) { func (s BioSequence) SetFeatures(feature []byte) { if cap(s.sequence.feature) >= 300 { - RecycleSlice(s.sequence.feature) + RecycleSlice(&s.sequence.feature) } s.sequence.feature = feature } func (s BioSequence) SetSequence(sequence []byte) { if s.sequence.sequence != nil { - RecycleSlice(s.sequence.sequence) + RecycleSlice(&s.sequence.sequence) } s.sequence.sequence = sequence } func (s BioSequence) SetQualities(qualities Quality) { if s.sequence.qualities != nil { - RecycleSlice(s.sequence.qualities) + RecycleSlice(&s.sequence.qualities) } s.sequence.qualities = qualities } diff --git a/pkg/obiseq/class.go b/pkg/obiseq/class.go index 2c75885..21353ad 100644 --- a/pkg/obiseq/class.go +++ b/pkg/obiseq/class.go @@ -3,71 +3,163 @@ package obiseq import ( "fmt" "hash/crc32" + "log" "strconv" + "sync" ) -type BioSequenceClassifier func(sequence BioSequence) string +type BioSequenceClassifier struct { + Code func(BioSequence) int + Value func(int) string +} -func AnnotationClassifier(key string, na string) BioSequenceClassifier { - f := func(sequence BioSequence) string { +//type BioSequenceClassifier func(sequence BioSequence) string + +func AnnotationClassifier(key string, na string) *BioSequenceClassifier { + encode := make(map[string]int, 1000) + decode := make([]string, 0, 1000) + locke := sync.RWMutex{} + maxcode := 0 + + code := func(sequence BioSequence) int { + var val string if sequence.HasAnnotation() { value, ok := sequence.Annotations()[key] if ok { switch value := value.(type) { case string: - return value + val = value default: - return fmt.Sprint(value) + val = fmt.Sprint(value) } - } + } } - return na + val = na + + locke.Lock() + defer locke.Unlock() + + k, ok := encode[val] + + if !ok { + k = maxcode + maxcode++ + encode[val] = k + decode = append(decode, val) + } + + return k } - return f + value := func(k int) string { + + locke.RLock() + defer locke.RUnlock() + if k >= maxcode { + log.Fatalf("value %d not register") + } + return decode[k] + } + + c := BioSequenceClassifier{code, value} + return &c } -func PredicateClassifier(predicate SequencePredicate) BioSequenceClassifier { - f := func(sequence BioSequence) string { +func PredicateClassifier(predicate SequencePredicate) *BioSequenceClassifier { + code := func(sequence BioSequence) int { if predicate(sequence) { - return "true" + return 1 } else { - return "false" + return 0 } + } - return f + value := func(k int) string { + if k == 0 { + return "false" + } else { + return "true" + } + + } + + c := BioSequenceClassifier{code, value} + return &c } // Builds a classifier function based on CRC32 of the sequence // -func HashClassifier(size int) BioSequenceClassifier { - f := func(sequence BioSequence) string { - h := crc32.ChecksumIEEE(sequence.Sequence()) % uint32(size) - return strconv.Itoa(int(h)) +func HashClassifier(size int) *BioSequenceClassifier { + code := func(sequence BioSequence) int { + return int(crc32.ChecksumIEEE(sequence.Sequence()) % uint32(size)) } - return f + value := func(k int) string { + return strconv.Itoa(k) + } + + c := BioSequenceClassifier{code, value} + return &c } // Builds a classifier function based on the sequence // -func SequenceClassifier() BioSequenceClassifier { - f := func(sequence BioSequence) string { - return sequence.String() +func SequenceClassifier() *BioSequenceClassifier { + encode := make(map[string]int, 1000) + decode := make([]string, 0, 1000) + locke := sync.RWMutex{} + maxcode := 0 + + code := func(sequence BioSequence) int { + val := sequence.String() + + locke.Lock() + defer locke.Unlock() + + k, ok := encode[val] + + if !ok { + k = maxcode + maxcode++ + encode[val] = k + decode = append(decode, val) + } + + return k } - return f + value := func(k int) string { + locke.RLock() + defer locke.RUnlock() + + if k >= maxcode { + log.Fatalf("value %d not register") + } + return decode[k] + } + + c := BioSequenceClassifier{code, value} + return &c } -func RotateClassifier(size int) BioSequenceClassifier { +func RotateClassifier(size int) *BioSequenceClassifier { n := 0 - f := func(sequence BioSequence) string { - h := n % size + lock := sync.Mutex{} + + code := func(sequence BioSequence) int { + lock.Lock() + defer lock.Unlock() + n = n % size n++ - return strconv.Itoa(int(h)) + return n } - return f + value := func(k int) string { + return strconv.Itoa(k) + } + + c := BioSequenceClassifier{code, value} + return &c } diff --git a/pkg/obiseq/distribute.go b/pkg/obiseq/distribute.go index 28cdb28..c4373f8 100644 --- a/pkg/obiseq/distribute.go +++ b/pkg/obiseq/distribute.go @@ -6,35 +6,35 @@ import ( ) type IDistribute struct { - outputs map[string]IBioSequenceBatch - news chan string + outputs map[int]IBioSequenceBatch + news chan int lock *sync.Mutex } -func (dist *IDistribute) Outputs(key string) (IBioSequenceBatch, error) { +func (dist *IDistribute) Outputs(key int) (IBioSequenceBatch, error) { dist.lock.Lock() iter, ok := dist.outputs[key] dist.lock.Unlock() if !ok { - return NilIBioSequenceBatch, fmt.Errorf("key %s unknown", key) + return NilIBioSequenceBatch, fmt.Errorf("code %d unknown", key) } return iter, nil } -func (dist *IDistribute) News() chan string { +func (dist *IDistribute) News() chan int { return dist.news } -func (iterator IBioSequenceBatch) Distribute(class BioSequenceClassifier, sizes ...int) IDistribute { +func (iterator IBioSequenceBatch) Distribute(class *BioSequenceClassifier, sizes ...int) IDistribute { batchsize := 5000 buffsize := 2 - outputs := make(map[string]IBioSequenceBatch, 100) - slices := make(map[string]*BioSequenceSlice, 100) - orders := make(map[string]int, 100) - news := make(chan string) + outputs := make(map[int]IBioSequenceBatch, 100) + slices := make(map[int]*BioSequenceSlice, 100) + orders := make(map[int]int, 100) + news := make(chan int) if len(sizes) > 0 { batchsize = sizes[0] @@ -63,11 +63,11 @@ func (iterator IBioSequenceBatch) Distribute(class BioSequenceClassifier, sizes for iterator.Next() { seqs := iterator.Get() for _, s := range seqs.Slice() { - key := class(s) + key := class.Code(s) slice, ok := slices[key] if !ok { - s := make(BioSequenceSlice, 0, batchsize) + s := GetBioSequenceSlice() slice = &s slices[key] = slice orders[key] = 0 @@ -84,10 +84,11 @@ func (iterator IBioSequenceBatch) Distribute(class BioSequenceClassifier, sizes if len(*slice) == batchsize { outputs[key].Channel() <- MakeBioSequenceBatch(orders[key], *slice...) orders[key]++ - s := make(BioSequenceSlice, 0, batchsize) + s := GetBioSequenceSlice() slices[key] = &s } } + seqs.Recycle() } for key, slice := range slices { diff --git a/pkg/obiseq/iterator.go b/pkg/obiseq/iterator.go index e7af044..c92ff1d 100644 --- a/pkg/obiseq/iterator.go +++ b/pkg/obiseq/iterator.go @@ -166,7 +166,7 @@ func (iterator IBioSequence) IBioSequenceBatch(sizes ...int) IBioSequenceBatch { go func() { for j := 0; !iterator.Finished(); j++ { batch := BioSequenceBatch{ - slice: make(BioSequenceSlice, 0, batchsize), + slice: GetBioSequenceSlice(), order: j} for i := 0; i < batchsize && iterator.Next(); i++ { seq := iterator.Get() @@ -280,7 +280,7 @@ func (iterator IBioSequence) Tail(n int, sizes ...int) IBioSequence { } newIter := MakeIBioSequence(buffsize) - buffseq := make(BioSequenceSlice, n) + buffseq := GetBioSequenceSlice() newIter.Add(1) diff --git a/pkg/obiseq/pool.go b/pkg/obiseq/pool.go index c822099..de8cd5f 100644 --- a/pkg/obiseq/pool.go +++ b/pkg/obiseq/pool.go @@ -13,9 +13,9 @@ var _BioSequenceByteSlicePool = sync.Pool{ }, } -func RecycleSlice(s []byte) { - s0 := s[:0] - _BioSequenceByteSlicePool.Put(&s0) +func RecycleSlice(s *[]byte) { + *s = (*s)[:0] + _BioSequenceByteSlicePool.Put(s) } func GetSlice(values ...byte) []byte { @@ -35,10 +35,10 @@ var BioSequenceAnnotationPool = sync.Pool{ }, } -func RecycleAnnotation(a Annotation) { +func RecycleAnnotation(a *Annotation) { if a != nil { - for k := range a { - delete(a, k) + for k := range *a { + delete(*a, k) } BioSequenceAnnotationPool.Put(&(a)) } @@ -54,6 +54,32 @@ func GetAnnotation(values ...Annotation) Annotation { return a } +var _BioSequenceSlicePool = sync.Pool{ + New: func() interface{} { + bs := make(BioSequenceSlice, 0, 5000) + return &bs + }, +} + +func (s *BioSequenceSlice) Recycle() { + *s = (*s)[:0] + _BioSequenceSlicePool.Put(s) +} + +func GetBioSequenceSlicePtr(values ...BioSequence) *BioSequenceSlice { + s := _BioSequenceSlicePool.Get().(*BioSequenceSlice) + + if len(values) > 0 { + *s = append(*s, values...) + } + + return s +} + +func GetBioSequenceSlice(values ...BioSequence) BioSequenceSlice { + return *GetBioSequenceSlicePtr(values...) +} + // var __bioseq__pool__ = sync.Pool{ // New: func() interface{} { // var bs _BioSequence diff --git a/pkg/obiseq/speed.go b/pkg/obiseq/speed.go new file mode 100644 index 0000000..198b4e4 --- /dev/null +++ b/pkg/obiseq/speed.go @@ -0,0 +1,6 @@ +package obiseq + +func (iterator IBioSequenceBatch) speed() IBioSequenceBatch { + newIter := MakeIBioSequenceBatch() + return newIter +} diff --git a/pkg/obitools/obidistribute/options.go b/pkg/obitools/obidistribute/options.go index 7a637bd..42c63c0 100644 --- a/pkg/obitools/obidistribute/options.go +++ b/pkg/obitools/obidistribute/options.go @@ -48,7 +48,7 @@ func OptionSet(options *getoptions.GetOpt) { DistributeOptionSet(options) } -func CLISequenceClassifier() obiseq.BioSequenceClassifier { +func CLISequenceClassifier() *obiseq.BioSequenceClassifier { switch { case _SequenceClassifierTag != "": return obiseq.AnnotationClassifier(_SequenceClassifierTag, _NAValue)