diff --git a/cmd/obitools/obicount/main.go b/cmd/obitools/obicount/main.go index d4d04e8..90893e4 100644 --- a/cmd/obitools/obicount/main.go +++ b/cmd/obitools/obicount/main.go @@ -40,13 +40,13 @@ func main() { nsymbol := 0 for fs.Next() { s := fs.Get() - if s.IsNil() { + if s==nil { log.Panicln("Read sequence is nil") } nread += s.Count() nvariant++ nsymbol += s.Length() - (&s).Recycle() + s.Recycle() } if obicount.CLIIsPrintingVariantCount() { diff --git a/cmd/obitools/obimultiplex/main.go b/cmd/obitools/obimultiplex/main.go index 132d06a..2805e3d 100644 --- a/cmd/obitools/obimultiplex/main.go +++ b/cmd/obitools/obimultiplex/main.go @@ -1,7 +1,9 @@ package main import ( + "log" "os" + "runtime/pprof" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" @@ -10,12 +12,12 @@ import ( func main() { - // f, err := os.Create("cpu.pprof") - // if err != nil { - // log.Fatal(err) - // } - // pprof.StartCPUProfile(f) - // defer pprof.StopCPUProfile() + f, err := os.Create("cpu.pprof") + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() // ftrace, err := os.Create("cpu.trace") // if err != nil { diff --git a/cmd/obitools/obiuniq/main.go b/cmd/obitools/obiuniq/main.go index 5b741e6..f2d6b82 100644 --- a/cmd/obitools/obiuniq/main.go +++ b/cmd/obitools/obiuniq/main.go @@ -6,12 +6,15 @@ import ( "runtime/pprof" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiuniq" ) func main() { + defer obiseq.LogBioSeqStatus() + // go tool pprof -http=":8000" ./obipairing ./cpu.pprof f, err := os.Create("cpu.pprof") if err != nil { diff --git a/cmd/test/main.go b/cmd/test/main.go index b328442..f54046c 100644 --- a/cmd/test/main.go +++ b/cmd/test/main.go @@ -43,7 +43,7 @@ func main() { A := []byte("ccgcctccttagaacaggctcctctagaaaaccatagtgggatatctaaagaaggcggagatagaaagagcggttcagcaggaatgccgagatggacggcgtgtgacg") // B := []byte("cgccaccaccgagatctacactctttccctacacgacgctcttccgatctccgcctccttagaacaggctcctctagaaaagcatagtggggtatctaaaggaggcgg") - sA := obiseq.MakeBioSequence("A", A, "") + sA := obiseq.NewBioSequence("A", A, "") // sB := obiseq.MakeBioSequence("B", B, "") pat, _ := obiapat.MakeApatPattern("TCCTTCCAACAGGCTCCTC", 3) diff --git a/pkg/obialign/alignment.go b/pkg/obialign/alignment.go index 6bf507b..58edb1d 100644 --- a/pkg/obialign/alignment.go +++ b/pkg/obialign/alignment.go @@ -65,24 +65,24 @@ func _BuildAlignment(seqA, seqB []byte, path []int, gap byte, bufferA, bufferB * // In that case an arena will be allocated by the function but, it will not // be reusable for other alignments and desallocated at the BuildAlignment // return. -func BuildAlignment(seqA, seqB obiseq.BioSequence, - path []int, gap byte) (obiseq.BioSequence, obiseq.BioSequence) { +func BuildAlignment(seqA, seqB *obiseq.BioSequence, + path []int, gap byte) (*obiseq.BioSequence, *obiseq.BioSequence) { bufferSA := obiseq.GetSlice() - defer obiseq.RecycleSlice(bufferSA) + defer obiseq.RecycleSlice(&bufferSA) bufferSB := obiseq.GetSlice() - defer obiseq.RecycleSlice(bufferSB) + defer obiseq.RecycleSlice(&bufferSB) _BuildAlignment(seqA.Sequence(), seqB.Sequence(), path, gap, &bufferSA, &bufferSB) - seqA = obiseq.MakeBioSequence(seqA.Id(), + seqA = obiseq.NewBioSequence(seqA.Id(), bufferSA, seqA.Definition()) - seqB = obiseq.MakeBioSequence(seqB.Id(), + seqB = obiseq.NewBioSequence(seqB.Id(), bufferSB, seqB.Definition()) @@ -110,15 +110,15 @@ func BuildAlignment(seqA, seqB obiseq.BioSequence, // In that case arenas will be allocated by the function but, they will not // be reusable for other alignments and desallocated at the BuildQualityConsensus // return. -func BuildQualityConsensus(seqA, seqB obiseq.BioSequence, path []int) (obiseq.BioSequence, int) { +func BuildQualityConsensus(seqA, seqB *obiseq.BioSequence, path []int) (*obiseq.BioSequence, int) { bufferSA := obiseq.GetSlice() bufferSB := obiseq.GetSlice() - defer obiseq.RecycleSlice(bufferSB) + defer obiseq.RecycleSlice(&bufferSB) bufferQA := obiseq.GetSlice() bufferQB := obiseq.GetSlice() - defer obiseq.RecycleSlice(bufferQB) + defer obiseq.RecycleSlice(&bufferQB) _BuildAlignment(seqA.Sequence(), seqB.Sequence(), path, ' ', &bufferSA, &bufferSB) @@ -178,7 +178,7 @@ func BuildQualityConsensus(seqA, seqB obiseq.BioSequence, path []int) (obiseq.Bi bufferQA[i] = q } - consSeq := obiseq.MakeBioSequence( + consSeq := obiseq.NewBioSequence( seqA.Id(), bufferSA, seqA.Definition(), diff --git a/pkg/obialign/fourbitsencode.go b/pkg/obialign/fourbitsencode.go index a106175..d65d1c0 100644 --- a/pkg/obialign/fourbitsencode.go +++ b/pkg/obialign/fourbitsencode.go @@ -57,7 +57,7 @@ var _FourBitsBaseDecode = []byte{ // by the ambiguity set to 1. // A byte slice can be provided (buffer) to preveent allocation of a new // memory chunk by th function. -func Encode4bits(seq obiseq.BioSequence, buffer []byte) []byte { +func Encode4bits(seq *obiseq.BioSequence, buffer []byte) []byte { length := seq.Length() rawseq := seq.Sequence() diff --git a/pkg/obialign/pairedendalign.go b/pkg/obialign/pairedendalign.go index 1a60c2b..0ab4e63 100644 --- a/pkg/obialign/pairedendalign.go +++ b/pkg/obialign/pairedendalign.go @@ -220,7 +220,7 @@ func _FillMatrixPeRightAlign(seqA, qualA, seqB, qualB []byte, gap float64, return _GetMatrix(scoreMatrix, la, la-1, lb1) } -func PELeftAlign(seqA, seqB obiseq.BioSequence, gap float64, +func PELeftAlign(seqA, seqB *obiseq.BioSequence, gap float64, arena PEAlignArena) (int, []int) { if !_InitializedDnaScore { @@ -244,7 +244,7 @@ func PELeftAlign(seqA, seqB obiseq.BioSequence, gap float64, return score, arena.pointer.path } -func PERightAlign(seqA, seqB obiseq.BioSequence, gap float64, +func PERightAlign(seqA, seqB *obiseq.BioSequence, gap float64, arena PEAlignArena) (int, []int) { if !_InitializedDnaScore { @@ -268,7 +268,7 @@ func PERightAlign(seqA, seqB obiseq.BioSequence, gap float64, return score, arena.pointer.path } -func PEAlign(seqA, seqB obiseq.BioSequence, +func PEAlign(seqA, seqB *obiseq.BioSequence, gap float64, delta int, arena PEAlignArena) (int, []int) { var score, shift int diff --git a/pkg/obiapat/pattern.go b/pkg/obiapat/pattern.go index 061978e..e8f0419 100644 --- a/pkg/obiapat/pattern.go +++ b/pkg/obiapat/pattern.go @@ -151,7 +151,7 @@ func (pattern ApatPattern) Print() { // at the junction. To limit memory allocation, it is possible to provide // an already allocated ApatSequence to recycle its allocated memory. // The provided sequence is no more usable after the call. -func MakeApatSequence(sequence obiseq.BioSequence, circular bool, recycle ...ApatSequence) (ApatSequence, error) { +func MakeApatSequence(sequence *obiseq.BioSequence, circular bool, recycle ...ApatSequence) (ApatSequence, error) { var errno C.int32_t var errmsg *C.char seqlen := sequence.Length() diff --git a/pkg/obiapat/pcr.go b/pkg/obiapat/pcr.go index e2da5fc..441412b 100644 --- a/pkg/obiapat/pcr.go +++ b/pkg/obiapat/pcr.go @@ -218,7 +218,7 @@ func OptionBatchSize(size int) WithOption { } func _Pcr(seq ApatSequence, - sequence obiseq.BioSequence, + sequence *obiseq.BioSequence, opt Options) obiseq.BioSequenceSlice { results := make(obiseq.BioSequenceSlice, 0, 10) @@ -278,7 +278,7 @@ func _Pcr(seq ApatSequence, match, _ := sequence.Subsequence(fm[0], fm[1], opt.pointer.circular) annot["forward_match"] = match.String() - (&match).Recycle() + match.Recycle() annot["forward_error"] = erri @@ -286,7 +286,7 @@ func _Pcr(seq ApatSequence, match, _ = sequence.Subsequence(rm[0], rm[1], opt.pointer.circular) match = match.ReverseComplement(true) annot["reverse_match"] = match.String() - (&match).Recycle() + match.Recycle() annot["reverse_error"] = errj results = append(results, amplicon) @@ -351,14 +351,14 @@ func _Pcr(seq ApatSequence, match, _ := sequence.Subsequence(rm[0], rm[1], opt.pointer.circular) match.ReverseComplement(true) annot["forward_match"] = match.String() - (&match).Recycle() + match.Recycle() annot["forward_error"] = errj annot["reverse_primer"] = reverse.String() match, _ = sequence.Subsequence(fm[0], fm[1], opt.pointer.circular) annot["reverse_match"] = match.String() - (&match).Recycle() + match.Recycle() annot["reverse_error"] = erri results = append(results, amplicon) @@ -376,7 +376,7 @@ func _Pcr(seq ApatSequence, // obiseq.BioSequence instance. PCR parameters are // specified using the corresponding Option functions // defined for the PCR algorithm. -func PCRSim(sequence obiseq.BioSequence, options ...WithOption) obiseq.BioSequenceSlice { +func PCRSim(sequence *obiseq.BioSequence, options ...WithOption) obiseq.BioSequenceSlice { opt := MakeOptions(options) diff --git a/pkg/obichunk/chunk_on_disk.go b/pkg/obichunk/chunk_on_disk.go index 1be3580..5635e44 100644 --- a/pkg/obichunk/chunk_on_disk.go +++ b/pkg/obichunk/chunk_on_disk.go @@ -58,7 +58,7 @@ func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch, }() newIter.Wait() - close(newIter.Channel()) + newIter.Close() }() obiformats.WriterDispatcher(dir+"/chunk_%s.fastx", @@ -78,14 +78,15 @@ func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch, panic(err) } - chunck := make(obiseq.BioSequenceSlice, 0, 10000) - + //chunck := make(obiseq.BioSequenceSlice, 0, 10000) + chunck := obiseq.MakeBioSequenceSlice() for iseq.Next() { b := iseq.Get() chunck = append(chunck, b.Slice()...) + b.Recycle() } - newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, chunck...) + newIter.Push(obiseq.MakeBioSequenceBatch(order, chunck)) } diff --git a/pkg/obichunk/chunks.go b/pkg/obichunk/chunks.go index 677c2f2..54c7bc6 100644 --- a/pkg/obichunk/chunks.go +++ b/pkg/obichunk/chunks.go @@ -23,7 +23,7 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, go func() { newIter.Wait() - close(newIter.Channel()) + newIter.Close() }() go func() { @@ -43,7 +43,7 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, log.Fatalf("Cannot retreive the new chanel : %v", err) } - chunk := obiseq.GetBioSequenceSlicePtr() + chunk := obiseq.NewBioSequenceSlice() lock.Lock() chunks[newflux] = chunk lock.Unlock() @@ -64,7 +64,7 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, for _, chunck := range chunks { if len(*chunck) > 0 { - newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, *chunck...) + newIter.Push(obiseq.MakeBioSequenceBatch(order, *chunck)) order++ } diff --git a/pkg/obichunk/subchunks.go b/pkg/obichunk/subchunks.go index 2d2ff61..281d588 100644 --- a/pkg/obichunk/subchunks.go +++ b/pkg/obichunk/subchunks.go @@ -1,11 +1,59 @@ package obichunk import ( - "sync" + "log" + "sort" + "sync/atomic" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) +// +// Interface for sorting a list of sequences accoording to +// their classes +// + +type sSS struct { + code int + seq *obiseq.BioSequence +} + +// By is the type of a "less" function that defines the ordering of its Planet arguments. +type _By func(p1, p2 *sSS) bool + +type sSSSorter struct { + seqs []sSS + by _By // Closure used in the Less method. +} + +// Len is part of sort.Interface. +func (s *sSSSorter) Len() int { + return len(s.seqs) +} + +// Swap is part of sort.Interface. +func (s *sSSSorter) Swap(i, j int) { + s.seqs[i], s.seqs[j] = s.seqs[j], s.seqs[i] +} + +// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter. +func (s *sSSSorter) Less(i, j int) bool { + return s.by(&s.seqs[i], &s.seqs[j]) +} + +// Sort is a method on the function type, By, that sorts the argument slice according to the function. +func (by _By) Sort(seqs []sSS) { + ps := &sSSSorter{ + seqs: seqs, + by: by, // The Sort method's receiver is the function (closure) that defines the sort order. + } + sort.Sort(ps) +} + +// +// End of the sort interface +// + func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch, classifier *obiseq.BioSequenceClassifier, sizes ...int) (obiseq.IBioSequenceBatch, error) { @@ -27,55 +75,76 @@ func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch, go func() { newIter.Wait() - close(newIter.Channel()) + newIter.Close() }() - omutex := sync.Mutex{} - order := 0 + //omutex := sync.Mutex{} + order := int32(0) nextOrder := func() int { - omutex.Lock() - neworder := order - order++ - omutex.Unlock() + neworder := int(atomic.AddInt32(&order, 1)) return neworder } - ff := func(iterator obiseq.IBioSequenceBatch) { - chunks := make(map[int]*obiseq.BioSequenceSlice, 100) + ff := func(iterator obiseq.IBioSequenceBatch, + classifier *obiseq.BioSequenceClassifier) { + + ordered := make([]sSS, 100) for iterator.Next() { batch := iterator.Get() - for _, s := range batch.Slice() { - key := classifier.Code(s) + if batch.Length() > 1 { + classifier.Reset() - slice, ok := chunks[key] - - if !ok { - slice = obiseq.GetBioSequenceSlicePtr() - chunks[key] = slice + if cap(ordered) < batch.Length() { + log.Println("Allocate a new ordered sequences : ", batch.Length()) + ordered = make([]sSS, batch.Length()) + } else { + ordered = ordered[:batch.Length()] } - *slice = append(*slice, s) - } + for i, s := range batch.Slice() { + ordered[i].code = classifier.Code(s) + ordered[i].seq = s + batch.Slice()[i] = nil + } - for k, chunck := range chunks { - newIter.Channel() <- obiseq.MakeBioSequenceBatch(nextOrder(), *chunck...) - delete(chunks, k) - } + batch.Recycle() - batch.Recycle() + _By(func(p1, p2 *sSS) bool { + return p1.code < p2.code + }).Sort(ordered) + + last := ordered[0].code + ss := obiseq.MakeBioSequenceSlice() + for i, v := range ordered { + if v.code != last { + newIter.Push(obiseq.MakeBioSequenceBatch(nextOrder(), ss)) + ss = obiseq.MakeBioSequenceSlice() + last = v.code + } + + ss = append(ss, v.seq) + ordered[i].seq = nil + } + + if len(ss) > 0 { + newIter.Push(obiseq.MakeBioSequenceBatch(nextOrder(), ss)) + } + } else { + newIter.Push(batch.Reorder(nextOrder())) + } } newIter.Done() } for i := 0; i < nworkers-1; i++ { - go ff(iterator.Split()) + go ff(iterator.Split(), classifier.Clone()) } - go ff(iterator) + go ff(iterator, classifier) return newIter, nil } diff --git a/pkg/obichunk/unique.go b/pkg/obichunk/unique.go index 55e75e3..4d91627 100644 --- a/pkg/obichunk/unique.go +++ b/pkg/obichunk/unique.go @@ -11,10 +11,12 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch, var err error opts := MakeOptions(options) + nworkers := opts.ParallelWorkers() iUnique := obiseq.MakeIBioSequenceBatch(opts.BufferSize()) if opts.SortOnDisk() { + nworkers = 1 iterator, err = ISequenceChunkOnDisk(iterator, obiseq.HashClassifier(opts.BatchCount()), opts.BufferSize()) @@ -33,13 +35,11 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch, } } - nworkers := opts.ParallelWorkers() - iUnique.Add(nworkers) go func() { iUnique.Wait() - close(iUnique.Channel()) + iUnique.Close() }() omutex := sync.Mutex{} @@ -58,14 +58,6 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch, cat := opts.Categories() na := opts.NAValue() - // ff = func(input obiseq.IBioSequenceBatch, - // classifier obiseq.BioSequenceClassifier, - // icat int) { - // log.Println(na, nextOrder) - // input.Recycle() - // iUnique.Done() - // } - ff = func(input obiseq.IBioSequenceBatch, classifier *obiseq.BioSequenceClassifier, icat int) { @@ -88,16 +80,17 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch, o := 0 for input.Next() { batch := input.Get() + if icat < 0 || len(batch.Slice()) == 1 { - iUnique.Channel() <- batch.Reorder(nextOrder()) + iUnique.Push(batch.Reorder(nextOrder())) } else { - next.Channel() <- batch.Reorder(o) + next.Push(batch.Reorder(o)) o++ } } if icat >= 0 { - close(next.Channel()) + next.Close() } iUnique.Done() @@ -112,12 +105,10 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch, obiseq.SequenceClassifier(), len(cat)) - iMerged := iUnique.MakeISliceWorker( - obiseq.MergeSliceWorker( - opts.NAValue(), - opts.StatsOn()...), + iMerged := iUnique.IMergeSequenceBatch(opts.NAValue(), + opts.StatsOn(), opts.BufferSize(), ) - return iMerged.Rebatch(opts.BatchSize()), nil + return iMerged.Speed(), nil } diff --git a/pkg/obiformats/dispatcher.go b/pkg/obiformats/dispatcher.go index c7f7a66..cd44a3f 100644 --- a/pkg/obiformats/dispatcher.go +++ b/pkg/obiformats/dispatcher.go @@ -31,11 +31,12 @@ func WriterDispatcher(prototypename string, } out, err := formater(data, - fmt.Sprintf(prototypename, newflux), + fmt.Sprintf(prototypename, dispatcher.Classifier().Value(newflux)), options...) if err != nil { - log.Fatalf("cannot open the output file for key %d", newflux) + log.Fatalf("cannot open the output file for key %s", + dispatcher.Classifier().Value(newflux)) } out.Recycle() diff --git a/pkg/obiformats/ecopcr_read.go b/pkg/obiformats/ecopcr_read.go index f24374f..10deb52 100644 --- a/pkg/obiformats/ecopcr_read.go +++ b/pkg/obiformats/ecopcr_read.go @@ -35,12 +35,12 @@ func __readline__(stream io.Reader) string { return string(line[0:i]) } -func __read_ecopcr_bioseq__(file *__ecopcr_file__) (obiseq.BioSequence, error) { +func __read_ecopcr_bioseq__(file *__ecopcr_file__) (*obiseq.BioSequence, error) { record, err := file.csv.Read() if err != nil { - return obiseq.NilBioSequence, err + return nil, err } name := strings.TrimSpace(record[0]) @@ -65,7 +65,7 @@ func __read_ecopcr_bioseq__(file *__ecopcr_file__) (obiseq.BioSequence, error) { comment = strings.TrimSpace(record[19]) } - bseq := obiseq.MakeBioSequence(name, sequence, comment) + bseq := obiseq.NewBioSequence(name, sequence, comment) annotation := bseq.Annotations() annotation["ac"] = name @@ -168,7 +168,7 @@ func ReadEcoPCRBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenc go func() { newIter.Wait() - close(newIter.Channel()) + newIter.Close() }() go func() { @@ -181,9 +181,8 @@ func ReadEcoPCRBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenc slice = append(slice, seq) ii++ if ii >= opt.BatchSize() { - newIter.Channel() <- obiseq.MakeBioSequenceBatch(i, slice...) - slice = make(obiseq.BioSequenceSlice, 0, opt.BatchSize()) - + newIter.Push(obiseq.MakeBioSequenceBatch(i, slice)) + slice = obiseq.MakeBioSequenceSlice() i++ ii = 0 } @@ -192,7 +191,7 @@ func ReadEcoPCRBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenc } if len(slice) > 0 { - newIter.Channel() <- obiseq.MakeBioSequenceBatch(i, slice...) + newIter.Push(obiseq.MakeBioSequenceBatch(i, slice)) } newIter.Done() diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index a97897b..6d2aa96 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -9,7 +9,6 @@ import ( "os" "strconv" "strings" - "time" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -124,7 +123,7 @@ func _ParseEmblFile(input <-chan _FileChunk, out obiseq.IBioSequenceBatch) { seqBytes.WriteString(parts[i]) } case line == "//": - sequence := obiseq.MakeBioSequence(id, + sequence := obiseq.NewBioSequence(id, seqBytes.Bytes(), defBytes.String()) @@ -140,8 +139,7 @@ func _ParseEmblFile(input <-chan _FileChunk, out obiseq.IBioSequenceBatch) { seqBytes = new(bytes.Buffer) } } - out.Channel() <- obiseq.MakeBioSequenceBatch(order, sequences...) - + out.Push(obiseq.MakeBioSequenceBatch(order, sequences)) } out.Done() @@ -188,11 +186,7 @@ func ReadEMBLBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenceB newIter.Add(nworkers) go func() { - newIter.Wait() - for len(newIter.Channel()) > 0 { - time.Sleep(time.Millisecond) - } - close(newIter.Channel()) + newIter.WaitAndClose() }() // for j := 0; j < opt.ParallelWorkers(); j++ { diff --git a/pkg/obiformats/fastseq_header.go b/pkg/obiformats/fastseq_header.go index 0f9df71..e7fead7 100644 --- a/pkg/obiformats/fastseq_header.go +++ b/pkg/obiformats/fastseq_header.go @@ -6,7 +6,7 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) -func ParseGuessedFastSeqHeader(sequence obiseq.BioSequence) { +func ParseGuessedFastSeqHeader(sequence *obiseq.BioSequence) { if strings.HasPrefix(sequence.Definition(), "{") { ParseFastSeqJsonHeader(sequence) } else { diff --git a/pkg/obiformats/fastseq_interface.go b/pkg/obiformats/fastseq_interface.go index 3a2f70b..af54d3e 100644 --- a/pkg/obiformats/fastseq_interface.go +++ b/pkg/obiformats/fastseq_interface.go @@ -2,4 +2,4 @@ package obiformats import "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" -type FormatHeader func(sequence obiseq.BioSequence) string +type FormatHeader func(sequence *obiseq.BioSequence) string diff --git a/pkg/obiformats/fastseq_json_header.go b/pkg/obiformats/fastseq_json_header.go index adc6c80..ec3fefc 100644 --- a/pkg/obiformats/fastseq_json_header.go +++ b/pkg/obiformats/fastseq_json_header.go @@ -49,12 +49,12 @@ func _parse_json_header_(header string, annotations obiseq.Annotation) string { return strings.TrimSpace(header[stop:]) } -func ParseFastSeqJsonHeader(sequence obiseq.BioSequence) { +func ParseFastSeqJsonHeader(sequence *obiseq.BioSequence) { sequence.SetDefinition(_parse_json_header_(sequence.Definition(), sequence.Annotations())) } -func FormatFastSeqJsonHeader(sequence obiseq.BioSequence) string { +func FormatFastSeqJsonHeader(sequence *obiseq.BioSequence) string { annotations := sequence.Annotations() if annotations != nil { diff --git a/pkg/obiformats/fastseq_obi_header.go b/pkg/obiformats/fastseq_obi_header.go index 5faf6cf..5c709aa 100644 --- a/pkg/obiformats/fastseq_obi_header.go +++ b/pkg/obiformats/fastseq_obi_header.go @@ -261,7 +261,7 @@ func ParseOBIFeatures(text string, annotations obiseq.Annotation) string { return string(bytes.TrimSpace(d)) } -func ParseFastSeqOBIHeader(sequence obiseq.BioSequence) { +func ParseFastSeqOBIHeader(sequence *obiseq.BioSequence) { annotations := sequence.Annotations() definition := ParseOBIFeatures(sequence.Definition(), @@ -270,7 +270,7 @@ func ParseFastSeqOBIHeader(sequence obiseq.BioSequence) { sequence.SetDefinition(definition) } -func FormatFastSeqOBIHeader(sequence obiseq.BioSequence) string { +func FormatFastSeqOBIHeader(sequence *obiseq.BioSequence) string { annotations := sequence.Annotations() if annotations != nil { diff --git a/pkg/obiformats/fastseq_read.go b/pkg/obiformats/fastseq_read.go index 92eaf59..3561f19 100644 --- a/pkg/obiformats/fastseq_read.go +++ b/pkg/obiformats/fastseq_read.go @@ -10,7 +10,6 @@ import ( "fmt" "log" "os" - "time" "unsafe" "git.metabarcoding.org/lecasofts/go/obitools/pkg/cutils" @@ -24,7 +23,7 @@ func _FastseqReader(seqfile C.fast_kseq_p, i := 0 ii := 0 - slice := obiseq.GetBioSequenceSlice() + slice := obiseq.MakeBioSequenceSlice() for l := int64(C.next_fast_sek(seqfile)); l > 0; l = int64(C.next_fast_sek(seqfile)) { @@ -45,7 +44,7 @@ func _FastseqReader(seqfile C.fast_kseq_p, comment = "" } - rep := obiseq.MakeBioSequence(name, sequence, comment) + rep := obiseq.NewBioSequence(name, sequence, comment) if s.qual.l > C.ulong(0) { cquality := cutils.ByteSlice(unsafe.Pointer(s.qual.s), int(s.qual.l)) @@ -64,17 +63,17 @@ func _FastseqReader(seqfile C.fast_kseq_p, // log.Printf("\n==> Pushing sequence batch\n") // start := time.Now() - iterator.Channel() <- obiseq.MakeBioSequenceBatch(i, slice...) + iterator.Push(obiseq.MakeBioSequenceBatch(i, slice)) // elapsed := time.Since(start) // log.Printf("\n==>sequences pushed after %s\n", elapsed) - slice = make(obiseq.BioSequenceSlice, 0, batch_size) + slice = obiseq.MakeBioSequenceSlice() i++ ii = 0 } } if len(slice) > 0 { - iterator.Channel() <- obiseq.MakeBioSequenceBatch(i, slice...) + iterator.Push(obiseq.MakeBioSequenceBatch(i, slice)) } iterator.Done() @@ -109,12 +108,7 @@ func ReadFastSeqBatchFromFile(filename string, options ...WithOption) (obiseq.IB newIter.Add(1) go func() { - newIter.Wait() - for len(newIter.Channel()) > 0 { - time.Sleep(time.Millisecond) - } - close(newIter.Channel()) - + newIter.WaitAndClose() log.Println("End of the fastq file reading") }() @@ -142,8 +136,7 @@ func ReadFastSeqBatchFromStdin(options ...WithOption) obiseq.IBioSequenceBatch { newIter.Add(1) go func() { - newIter.Wait() - close(newIter.Channel()) + newIter.WaitAndClose() }() go _FastseqReader(C.open_fast_sek_stdin(C.int32_t(opt.QualityShift())), newIter, opt.BatchSize()) diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index 4aeadee..d51fb10 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -7,7 +7,6 @@ import ( "log" "os" "strings" - "time" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -19,9 +18,13 @@ func min(x, y int) int { return y } -func FormatFasta(seq obiseq.BioSequence, formater FormatHeader) string { +func FormatFasta(seq *obiseq.BioSequence, formater FormatHeader) string { var fragments strings.Builder + if seq==nil { + log.Panicln("try to format a nil BioSequence") + } + s := seq.Sequence() l := len(s) @@ -106,16 +109,8 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options newIter.Add(nwriters) go func() { - newIter.Wait() - for len(chunkchan) > 0 { - time.Sleep(time.Millisecond) - } + newIter.WaitAndClose() close(chunkchan) - for len(newIter.Channel()) > 0 { - time.Sleep(time.Millisecond) - } - close(newIter.Channel()) - }() ff := func(iterator obiseq.IBioSequenceBatch) { @@ -125,7 +120,7 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options FormatFastaBatch(batch, header_format), batch.Order(), } - newIter.Channel() <- batch + newIter.Push(batch) } newIter.Done() } @@ -156,7 +151,7 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options } } - + if opt.CloseFile() { switch file := file.(type) { case *os.File: diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index c2a9b9d..cd9594a 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -11,7 +11,7 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) -func FormatFastq(seq obiseq.BioSequence, quality_shift int, formater FormatHeader) string { +func FormatFastq(seq *obiseq.BioSequence, quality_shift int, formater FormatHeader) string { l := seq.Length() q := seq.Qualities() @@ -106,15 +106,11 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options newIter.Add(nwriters) go func() { - newIter.Wait() + newIter.WaitAndClose() for len(chunkchan) > 0 { time.Sleep(time.Millisecond) } close(chunkchan) - for len(newIter.Channel()) > 0 { - time.Sleep(time.Millisecond) - } - close(newIter.Channel()) }() ff := func(iterator obiseq.IBioSequenceBatch) { @@ -125,7 +121,7 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options batch.Order(), } chunkchan <- chunk - newIter.Channel() <- batch + newIter.Push(batch) } newIter.Done() } diff --git a/pkg/obiformats/options.go b/pkg/obiformats/options.go index 41359c7..9790a0c 100644 --- a/pkg/obiformats/options.go +++ b/pkg/obiformats/options.go @@ -6,7 +6,7 @@ import ( type __options__ struct { fastseq_header_parser obiseq.SeqAnnotator - fastseq_header_writer func(obiseq.BioSequence) string + fastseq_header_writer func(*obiseq.BioSequence) string with_progress_bar bool buffer_size int batch_size int @@ -62,7 +62,7 @@ func (opt Options) ParseFastSeqHeader() obiseq.SeqAnnotator { return opt.pointer.fastseq_header_parser } -func (opt Options) FormatFastSeqHeader() func(obiseq.BioSequence) string { +func (opt Options) FormatFastSeqHeader() func(*obiseq.BioSequence) string { return opt.pointer.fastseq_header_writer } @@ -141,7 +141,7 @@ func OptionsFastSeqDefaultHeaderParser() WithOption { // OptionsFastSeqHeaderFormat allows foor specifying the format // used to write FASTA and FASTQ sequence. -func OptionsFastSeqHeaderFormat(format func(obiseq.BioSequence) string) WithOption { +func OptionsFastSeqHeaderFormat(format func(*obiseq.BioSequence) string) WithOption { f := WithOption(func(opt Options) { opt.pointer.fastseq_header_writer = format }) diff --git a/pkg/obiformats/universal_read.go b/pkg/obiformats/universal_read.go index e10911b..e068dc6 100644 --- a/pkg/obiformats/universal_read.go +++ b/pkg/obiformats/universal_read.go @@ -66,7 +66,7 @@ func ReadSequencesBatchFromFile(filename string, options ...WithOption) (obiseq. if len(tag) < 30 { newIter := obiseq.MakeIBioSequenceBatch() - close(newIter.Channel()) + newIter.Close() return newIter, nil } diff --git a/pkg/obikmer/encodefourmer.go b/pkg/obikmer/encodefourmer.go index 14e4cd2..6bb735d 100644 --- a/pkg/obikmer/encodefourmer.go +++ b/pkg/obikmer/encodefourmer.go @@ -30,7 +30,7 @@ var __single_base_code__ = []byte{0, // in hexadecimal and 27 in decimal. If the buffer parameter is not nil // the slice is used to store the result, overwise a new slice is // created. -func Encode4mer(seq obiseq.BioSequence, buffer *[]byte) []byte { +func Encode4mer(seq *obiseq.BioSequence, buffer *[]byte) []byte { slength := seq.Length() length := slength - 3 rawseq := seq.Sequence() @@ -65,7 +65,7 @@ func Encode4mer(seq obiseq.BioSequence, buffer *[]byte) []byte { return *buffer } -func Index4mer(seq obiseq.BioSequence, index *[][]int, buffer *[]byte) [][]int { +func Index4mer(seq *obiseq.BioSequence, index *[][]int, buffer *[]byte) [][]int { iternal_buffer := Encode4mer(seq, buffer) @@ -85,7 +85,7 @@ func Index4mer(seq obiseq.BioSequence, index *[][]int, buffer *[]byte) [][]int { return *index } -func FastShiftFourMer(index [][]int, seq obiseq.BioSequence, buffer *[]byte) (int, int) { +func FastShiftFourMer(index [][]int, seq *obiseq.BioSequence, buffer *[]byte) (int, int) { iternal_buffer := Encode4mer(seq, buffer) diff --git a/pkg/obingslibrary/match.go b/pkg/obingslibrary/match.go index e6fc60e..820ff64 100644 --- a/pkg/obingslibrary/match.go +++ b/pkg/obingslibrary/match.go @@ -38,7 +38,7 @@ func (library *NGSLibrary) Compile(maxError int) error { return nil } -func (library *NGSLibrary) Match(sequence obiseq.BioSequence) *DemultiplexMatch { +func (library *NGSLibrary) Match(sequence *obiseq.BioSequence) *DemultiplexMatch { for primers, marker := range *library { m := marker.Match(sequence) if m != nil { @@ -50,7 +50,7 @@ func (library *NGSLibrary) Match(sequence obiseq.BioSequence) *DemultiplexMatch return nil } -func (library *NGSLibrary) ExtractBarcode(sequence obiseq.BioSequence, inplace bool) (obiseq.BioSequence, error) { +func (library *NGSLibrary) ExtractBarcode(sequence *obiseq.BioSequence, inplace bool) (*obiseq.BioSequence, error) { match := library.Match(sequence) return match.ExtractBarcode(sequence, inplace) } @@ -103,7 +103,7 @@ func (marker *Marker) Compile(forward, reverse string, maxError int) error { return nil } -func (marker *Marker) Match(sequence obiseq.BioSequence) *DemultiplexMatch { +func (marker *Marker) Match(sequence *obiseq.BioSequence) *DemultiplexMatch { aseq, _ := obiapat.MakeApatSequence(sequence, false) match := marker.forward.FindAllIndex(aseq, marker.taglength) @@ -134,7 +134,7 @@ func (marker *Marker) Match(sequence obiseq.BioSequence) *DemultiplexMatch { srtag := "" if err != nil { - rtag = obiseq.NilBioSequence + rtag = nil } else { rtag.ReverseComplement(true) srtag = strings.ToLower(rtag.String()) @@ -189,7 +189,7 @@ func (marker *Marker) Match(sequence obiseq.BioSequence) *DemultiplexMatch { defer ftag.Recycle() sftag := "" if err != nil { - ftag = obiseq.NilBioSequence + ftag = nil } else { ftag = ftag.ReverseComplement(true) @@ -218,7 +218,7 @@ func (marker *Marker) Match(sequence obiseq.BioSequence) *DemultiplexMatch { return nil } -func (match *DemultiplexMatch) ExtractBarcode(sequence obiseq.BioSequence, inplace bool) (obiseq.BioSequence, error) { +func (match *DemultiplexMatch) ExtractBarcode(sequence *obiseq.BioSequence, inplace bool) (*obiseq.BioSequence, error) { if !inplace { sequence = sequence.Copy() } diff --git a/pkg/obiseq/batchiterator.go b/pkg/obiseq/batchiterator.go index ea2c371..1be3a50 100644 --- a/pkg/obiseq/batchiterator.go +++ b/pkg/obiseq/batchiterator.go @@ -5,6 +5,7 @@ import ( "log" "sync" "sync/atomic" + "time" "github.com/tevino/abool/v2" ) @@ -16,7 +17,7 @@ type BioSequenceBatch struct { var NilBioSequenceBatch = BioSequenceBatch{nil, -1} -func MakeBioSequenceBatch(order int, sequences ...BioSequence) BioSequenceBatch { +func MakeBioSequenceBatch(order int, sequences BioSequenceSlice) BioSequenceBatch { return BioSequenceBatch{ slice: sequences, order: order, @@ -39,6 +40,15 @@ func (batch BioSequenceBatch) Slice() BioSequenceSlice { func (batch BioSequenceBatch) Length() int { return len(batch.slice) } + +func (batch BioSequenceBatch) NotEmpty() bool { + return batch.slice.NotEmpty() +} + +func (batch BioSequenceBatch) Pop0() *BioSequence { + return batch.slice.Pop0() +} + func (batch BioSequenceBatch) IsNil() bool { return batch.slice == nil } @@ -201,6 +211,30 @@ func (iterator IBioSequenceBatch) Get() BioSequenceBatch { return iterator.pointer.current } +func (iterator IBioSequenceBatch) Push(batch BioSequenceBatch) { + if batch.IsNil() { + log.Panicln("An Nil batch is pushed on the channel") + } + if batch.Length() == 0 { + log.Panicln("An empty batch is pushed on the channel") + } + + iterator.pointer.channel <- batch +} + +func (iterator IBioSequenceBatch) Close() { + close(iterator.pointer.channel) +} + +func (iterator IBioSequenceBatch) WaitAndClose() { + iterator.Wait() + + for len(iterator.Channel()) > 0 { + time.Sleep(time.Millisecond) + } + iterator.Close() +} + // Finished returns 'true' value if no more data is available // from the iterator. func (iterator IBioSequenceBatch) Finished() bool { @@ -227,9 +261,10 @@ func (iterator IBioSequenceBatch) IBioSequence(sizes ...int) IBioSequence { for iterator.Next() { batch := iterator.Get() - for _, s := range batch.slice { - newIter.pointer.channel <- s + for batch.NotEmpty() { + newIter.pointer.channel <- batch.Pop0() } + batch.Recycle() } newIter.Done() }() @@ -304,7 +339,7 @@ func (iterator IBioSequenceBatch) Concat(iterators ...IBioSequenceBatch) IBioSeq if s.order > max_order { max_order = s.order } - newIter.Channel() <- s.Reorder(s.order + previous_max) + newIter.Push(s.Reorder(s.order + previous_max)) } previous_max = max_order + 1 @@ -315,7 +350,7 @@ func (iterator IBioSequenceBatch) Concat(iterators ...IBioSequenceBatch) IBioSeq max_order = s.order + previous_max } - newIter.Channel() <- s.Reorder(s.order + previous_max) + newIter.Push(s.Reorder(s.order + previous_max)) } previous_max = max_order + 1 } @@ -348,23 +383,23 @@ func (iterator IBioSequenceBatch) Rebatch(size int, sizes ...int) IBioSequenceBa go func() { order := 0 iterator = iterator.SortBatches() - buffer := GetBioSequenceSlice() + buffer := MakeBioSequenceSlice() for iterator.Next() { seqs := iterator.Get() for _, s := range seqs.slice { buffer = append(buffer, s) if len(buffer) == size { - newIter.Channel() <- MakeBioSequenceBatch(order, buffer...) + newIter.Push(MakeBioSequenceBatch(order, buffer)) order++ - buffer = GetBioSequenceSlice() + buffer = MakeBioSequenceSlice() } } seqs.Recycle() } if len(buffer) > 0 { - newIter.Channel() <- MakeBioSequenceBatch(order, buffer...) + newIter.Push(MakeBioSequenceBatch(order, buffer)) } newIter.Done() @@ -377,15 +412,17 @@ func (iterator IBioSequenceBatch) Rebatch(size int, sizes ...int) IBioSequenceBa func (iterator IBioSequenceBatch) Recycle() { log.Println("Start recycling of Bioseq objects") - + recycled := 0 for iterator.Next() { // iterator.Get() batch := iterator.Get() for _, seq := range batch.Slice() { - (&seq).Recycle() + seq.Recycle() + recycled++ } + batch.Recycle() } - log.Println("End of the recycling of Bioseq objects") + log.Printf("End of the recycling of %d Bioseq objects", recycled) } func (iterator IBioSequenceBatch) PairWith(reverse IBioSequenceBatch, sizes ...int) IPairedBioSequenceBatch { @@ -444,10 +481,8 @@ func (iterator IBioSequenceBatch) DivideOn(predicate SequencePredicate, falseIter.Add(1) go func() { - trueIter.Wait() - falseIter.Wait() - close(trueIter.Channel()) - close(falseIter.Channel()) + trueIter.WaitAndClose() + falseIter.WaitAndClose() }() go func() { @@ -455,8 +490,8 @@ func (iterator IBioSequenceBatch) DivideOn(predicate SequencePredicate, falseOrder := 0 iterator = iterator.SortBatches() - trueSlice := GetBioSequenceSlice() - falseSlice := GetBioSequenceSlice() + trueSlice := MakeBioSequenceSlice() + falseSlice := MakeBioSequenceSlice() for iterator.Next() { seqs := iterator.Get() @@ -468,26 +503,26 @@ func (iterator IBioSequenceBatch) DivideOn(predicate SequencePredicate, } if len(trueSlice) == size { - trueIter.Channel() <- MakeBioSequenceBatch(trueOrder, trueSlice...) + trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice)) trueOrder++ - trueSlice = GetBioSequenceSlice() + trueSlice = MakeBioSequenceSlice() } if len(falseSlice) == size { - falseIter.Channel() <- MakeBioSequenceBatch(falseOrder, falseSlice...) + falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice)) falseOrder++ - falseSlice = GetBioSequenceSlice() + falseSlice = MakeBioSequenceSlice() } } seqs.Recycle() } if len(trueSlice) > 0 { - trueIter.Channel() <- MakeBioSequenceBatch(trueOrder, trueSlice...) + trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice)) } if len(falseSlice) > 0 { - falseIter.Channel() <- MakeBioSequenceBatch(falseOrder, falseSlice...) + falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice)) } trueIter.Done() diff --git a/pkg/obiseq/biosequence.go b/pkg/obiseq/biosequence.go index a4f78ae..b5f15cf 100644 --- a/pkg/obiseq/biosequence.go +++ b/pkg/obiseq/biosequence.go @@ -2,10 +2,22 @@ package obiseq import ( "crypto/md5" + "log" + "sync/atomic" "git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils" ) +var _NewSeq = int32(0) +var _RecycleSeq = int32(0) +var _InMemSeq = int32(0) +var _MaxInMemSeq = int32(0) +var _BioLogRate = int(100000) + +func LogBioSeqStatus() { + log.Printf("@@@@>>>> Created seq : %d Destroyed : %d In Memory : %d", _NewSeq, _RecycleSeq, _InMemSeq) +} + type Quality []uint8 var __default_qualities__ = make(Quality, 0, 500) @@ -22,7 +34,7 @@ func __make_default_qualities__(length int) Quality { type Annotation map[string]interface{} -type _BioSequence struct { +type BioSequence struct { id string definition string sequence []byte @@ -31,12 +43,17 @@ type _BioSequence struct { annotations Annotation } -type BioSequence struct { - sequence *_BioSequence -} - func MakeEmptyBioSequence() BioSequence { - bs := _BioSequence{ + atomic.AddInt32(&_NewSeq, 1) + atomic.AddInt32(&_InMemSeq, 1) + + //if atomic.CompareAndSwapInt32()() + + // if int(_NewSeq)%int(_BioLogRate) == 0 { + // LogBioSeqStatus() + // } + + return BioSequence{ id: "", definition: "", sequence: nil, @@ -44,7 +61,11 @@ func MakeEmptyBioSequence() BioSequence { feature: nil, annotations: nil, } - return BioSequence{&bs} +} + +func NewEmptyBioSequence() *BioSequence { + s := MakeEmptyBioSequence() + return &s } func MakeBioSequence(id string, @@ -57,104 +78,109 @@ func MakeBioSequence(id string, return bs } +func NewBioSequence(id string, + sequence []byte, + definition string) *BioSequence { + s := MakeBioSequence(id, sequence, definition) + return &s +} + func (sequence *BioSequence) Recycle() { - pseq := sequence.sequence + atomic.AddInt32(&_RecycleSeq, 1) + atomic.AddInt32(&_InMemSeq, -1) - if pseq != nil { - RecycleSlice(&pseq.sequence) - RecycleSlice(&pseq.feature) - RecycleSlice(&pseq.qualities) + // if int(_RecycleSeq)%int(_BioLogRate) == 0 { + // LogBioSeqStatus() + // } - RecycleAnnotation(&pseq.annotations) + if sequence != nil { + RecycleSlice(&sequence.sequence) + sequence.sequence = nil + RecycleSlice(&sequence.feature) + sequence.feature = nil + RecycleSlice(&sequence.qualities) + sequence.qualities = nil + + RecycleAnnotation(&sequence.annotations) + sequence.annotations = nil } - - sequence.sequence = nil } -var NilBioSequence = BioSequence{sequence: nil} - -func (s BioSequence) IsNil() bool { - return s.sequence == nil -} - -func (s BioSequence) Copy() BioSequence { +func (s *BioSequence) Copy() *BioSequence { newSeq := MakeEmptyBioSequence() - newSeq.sequence.id = s.sequence.id - newSeq.sequence.definition = s.sequence.definition + newSeq.id = s.id + newSeq.definition = s.definition - newSeq.sequence.sequence = GetSlice(s.sequence.sequence...) - newSeq.sequence.qualities = GetSlice(s.sequence.qualities...) - newSeq.sequence.feature = GetSlice(s.sequence.feature...) + newSeq.sequence = GetSlice(s.sequence...) + newSeq.qualities = GetSlice(s.qualities...) + newSeq.feature = GetSlice(s.feature...) - if len(s.sequence.annotations) > 0 { - newSeq.sequence.annotations = GetAnnotation(s.sequence.annotations) + if len(s.annotations) > 0 { + newSeq.annotations = GetAnnotation(s.annotations) } - return newSeq + return &newSeq } -func (s BioSequence) Id() string { - return s.sequence.id +func (s *BioSequence) Id() string { + return s.id } -func (s BioSequence) Definition() string { - return s.sequence.definition +func (s *BioSequence) Definition() string { + return s.definition } -func (s BioSequence) Sequence() []byte { - return s.sequence.sequence +func (s *BioSequence) Sequence() []byte { + return s.sequence } -func (s BioSequence) String() string { - return string(s.sequence.sequence) +func (s *BioSequence) String() string { + return string(s.sequence) } -func (s BioSequence) Length() int { - return len(s.sequence.sequence) +func (s *BioSequence) Length() int { + return len(s.sequence) } -func (s BioSequence) HasQualities() bool { - return len(s.sequence.qualities) > 0 +func (s *BioSequence) HasQualities() bool { + return len(s.qualities) > 0 } -func (s BioSequence) Qualities() Quality { +func (s *BioSequence) Qualities() Quality { if s.HasQualities() { - return s.sequence.qualities + return s.qualities } else { - return __make_default_qualities__(len(s.sequence.sequence)) + return __make_default_qualities__(len(s.sequence)) } } -func (s BioSequence) Features() string { - return string(s.sequence.feature) +func (s *BioSequence) Features() string { + return string(s.feature) } -func (s BioSequence) HasAnnotation() bool { - return len(s.sequence.annotations) > 0 +func (s *BioSequence) HasAnnotation() bool { + return len(s.annotations) > 0 } -func (s BioSequence) Annotations() Annotation { - if s.sequence == nil { - return nil +func (s *BioSequence) Annotations() Annotation { + + if s.annotations == nil { + s.annotations = GetAnnotation() } - if s.sequence.annotations == nil { - s.sequence.annotations = GetAnnotation() - } - - return s.sequence.annotations + return s.annotations } -func (s BioSequence) MD5() [16]byte { - return md5.Sum(s.sequence.sequence) +func (s *BioSequence) MD5() [16]byte { + return md5.Sum(s.sequence) } -func (s BioSequence) Count() int { - if s.sequence.annotations == nil { +func (s *BioSequence) Count() int { + if s.annotations == nil { return 1 } - if val, ok := (s.sequence.annotations)["count"]; ok { + if val, ok := (s.annotations)["count"]; ok { val, err := goutils.InterfaceToInt(val) if err == nil { return val @@ -163,12 +189,12 @@ func (s BioSequence) Count() int { return 1 } -func (s BioSequence) Taxid() int { - if s.sequence.annotations == nil { +func (s *BioSequence) Taxid() int { + if s.annotations == nil { return 1 } - if val, ok := (s.sequence.annotations)["taxid"]; ok { + if val, ok := (s.annotations)["taxid"]; ok { val, err := goutils.InterfaceToInt(val) if err == nil { return val @@ -177,56 +203,56 @@ func (s BioSequence) Taxid() int { return 1 } -func (s BioSequence) SetId(id string) { - s.sequence.id = id +func (s *BioSequence) SetId(id string) { + s.id = id } -func (s BioSequence) SetDefinition(definition string) { - s.sequence.definition = definition +func (s *BioSequence) SetDefinition(definition string) { + s.definition = definition } -func (s BioSequence) SetFeatures(feature []byte) { - if cap(s.sequence.feature) >= 300 { - RecycleSlice(&s.sequence.feature) +func (s *BioSequence) SetFeatures(feature []byte) { + if cap(s.feature) >= 300 { + RecycleSlice(&s.feature) } - s.sequence.feature = feature + s.feature = feature } -func (s BioSequence) SetSequence(sequence []byte) { - if s.sequence.sequence != nil { - RecycleSlice(&s.sequence.sequence) +func (s *BioSequence) SetSequence(sequence []byte) { + if s.sequence != nil { + RecycleSlice(&s.sequence) } - s.sequence.sequence = sequence + s.sequence = sequence } -func (s BioSequence) SetQualities(qualities Quality) { - if s.sequence.qualities != nil { - RecycleSlice(&s.sequence.qualities) +func (s *BioSequence) SetQualities(qualities Quality) { + if s.qualities != nil { + RecycleSlice(&s.qualities) } - s.sequence.qualities = qualities + s.qualities = qualities } -func (s BioSequence) WriteQualities(data []byte) (int, error) { - s.sequence.qualities = append(s.sequence.qualities, data...) +func (s *BioSequence) WriteQualities(data []byte) (int, error) { + s.qualities = append(s.qualities, data...) return len(data), nil } -func (s BioSequence) WriteByteQualities(data byte) error { - s.sequence.qualities = append(s.sequence.qualities, data) +func (s *BioSequence) WriteByteQualities(data byte) error { + s.qualities = append(s.qualities, data) return nil } -func (s BioSequence) Write(data []byte) (int, error) { - s.sequence.sequence = append(s.sequence.sequence, data...) +func (s *BioSequence) Write(data []byte) (int, error) { + s.sequence = append(s.sequence, data...) return len(data), nil } -func (s BioSequence) WriteString(data string) (int, error) { +func (s *BioSequence) WriteString(data string) (int, error) { bdata := []byte(data) return s.Write(bdata) } -func (s BioSequence) WriteByte(data byte) error { - s.sequence.sequence = append(s.sequence.sequence, data) +func (s *BioSequence) WriteByte(data byte) error { + s.sequence = append(s.sequence, data) return nil } diff --git a/pkg/obiseq/biosequenceslice.go b/pkg/obiseq/biosequenceslice.go index 034d9c5..aa39978 100644 --- a/pkg/obiseq/biosequenceslice.go +++ b/pkg/obiseq/biosequenceslice.go @@ -1,3 +1,58 @@ package obiseq -type BioSequenceSlice []BioSequence +import ( + "sync" +) + +type BioSequenceSlice []*BioSequence + +var _BioSequenceSlicePool = sync.Pool{ + New: func() interface{} { + bs := make(BioSequenceSlice, 0, 10) + return &bs + }, +} + +func NewBioSequenceSlice() *BioSequenceSlice { + return _BioSequenceSlicePool.Get().(*BioSequenceSlice) +} + +func MakeBioSequenceSlice() BioSequenceSlice { + return *NewBioSequenceSlice() +} + +func (s *BioSequenceSlice) Recycle() { + // if s == nil { + // log.Panicln("Trying too recycle a nil pointer") + // } + + // // Code added to potentially limit memory leaks + // for i := range *s { + // (*s)[i] = nil + // } + + // *s = (*s)[:0] + // _BioSequenceSlicePool.Put(s) +} + +func (s *BioSequenceSlice) Push(sequence *BioSequence) { + *s = append(*s, sequence) +} + +func (s *BioSequenceSlice) Pop() *BioSequence { + _s := (*s)[len(*s)-1] + (*s)[len(*s)-1] = nil + *s = (*s)[:len(*s)-1] + return _s +} + +func (s *BioSequenceSlice) Pop0() *BioSequence { + _s := (*s)[0] + (*s)[0] = nil + *s = (*s)[1:] + return _s +} + +func (s BioSequenceSlice) NotEmpty() bool { + return len(s) > 0 +} diff --git a/pkg/obiseq/class.go b/pkg/obiseq/class.go index 21353ad..1b0f0e5 100644 --- a/pkg/obiseq/class.go +++ b/pkg/obiseq/class.go @@ -9,19 +9,19 @@ import ( ) type BioSequenceClassifier struct { - Code func(BioSequence) int + Code func(*BioSequence) int Value func(int) string + Reset func() + Clone func() *BioSequenceClassifier } -//type BioSequenceClassifier func(sequence BioSequence) string - func AnnotationClassifier(key string, na string) *BioSequenceClassifier { encode := make(map[string]int, 1000) decode := make([]string, 0, 1000) locke := sync.RWMutex{} maxcode := 0 - code := func(sequence BioSequence) int { + code := func(sequence *BioSequence) int { var val string if sequence.HasAnnotation() { value, ok := sequence.Annotations()[key] @@ -62,12 +62,26 @@ func AnnotationClassifier(key string, na string) *BioSequenceClassifier { return decode[k] } - c := BioSequenceClassifier{code, value} + reset := func() { + locke.Lock() + defer locke.Unlock() + + for k := range encode { + delete(encode, k) + } + decode = decode[:0] + } + + clone := func() *BioSequenceClassifier { + return AnnotationClassifier(key, na) + } + + c := BioSequenceClassifier{code, value, reset, clone} return &c } func PredicateClassifier(predicate SequencePredicate) *BioSequenceClassifier { - code := func(sequence BioSequence) int { + code := func(sequence *BioSequence) int { if predicate(sequence) { return 1 } else { @@ -85,14 +99,22 @@ func PredicateClassifier(predicate SequencePredicate) *BioSequenceClassifier { } - c := BioSequenceClassifier{code, value} + reset := func() { + + } + + clone := func() *BioSequenceClassifier { + return PredicateClassifier(predicate) + } + + c := BioSequenceClassifier{code, value, reset, clone} return &c } // Builds a classifier function based on CRC32 of the sequence // func HashClassifier(size int) *BioSequenceClassifier { - code := func(sequence BioSequence) int { + code := func(sequence *BioSequence) int { return int(crc32.ChecksumIEEE(sequence.Sequence()) % uint32(size)) } @@ -100,7 +122,15 @@ func HashClassifier(size int) *BioSequenceClassifier { return strconv.Itoa(k) } - c := BioSequenceClassifier{code, value} + reset := func() { + + } + + clone := func() *BioSequenceClassifier { + return HashClassifier(size) + } + + c := BioSequenceClassifier{code, value, reset, clone} return &c } @@ -112,7 +142,7 @@ func SequenceClassifier() *BioSequenceClassifier { locke := sync.RWMutex{} maxcode := 0 - code := func(sequence BioSequence) int { + code := func(sequence *BioSequence) int { val := sequence.String() locke.Lock() @@ -140,7 +170,23 @@ func SequenceClassifier() *BioSequenceClassifier { return decode[k] } - c := BioSequenceClassifier{code, value} + reset := func() { + locke.Lock() + defer locke.Unlock() + + // for k := range encode { + // delete(encode, k) + // } + encode = make(map[string]int) + decode = decode[:0] + maxcode = 0 + } + + clone := func() *BioSequenceClassifier { + return SequenceClassifier() + } + + c := BioSequenceClassifier{code, value, reset, clone} return &c } @@ -148,7 +194,7 @@ func RotateClassifier(size int) *BioSequenceClassifier { n := 0 lock := sync.Mutex{} - code := func(sequence BioSequence) int { + code := func(sequence *BioSequence) int { lock.Lock() defer lock.Unlock() n = n % size @@ -160,6 +206,14 @@ func RotateClassifier(size int) *BioSequenceClassifier { return strconv.Itoa(k) } - c := BioSequenceClassifier{code, value} + reset := func() { + + } + + clone := func() *BioSequenceClassifier { + return RotateClassifier(size) + } + + c := BioSequenceClassifier{code, value, reset, clone} return &c } diff --git a/pkg/obiseq/distribute.go b/pkg/obiseq/distribute.go index c4373f8..e51a971 100644 --- a/pkg/obiseq/distribute.go +++ b/pkg/obiseq/distribute.go @@ -6,9 +6,10 @@ import ( ) type IDistribute struct { - outputs map[int]IBioSequenceBatch - news chan int - lock *sync.Mutex + outputs map[int]IBioSequenceBatch + news chan int + classifier *BioSequenceClassifier + lock *sync.Mutex } func (dist *IDistribute) Outputs(key int) (IBioSequenceBatch, error) { @@ -27,6 +28,10 @@ func (dist *IDistribute) News() chan int { return dist.news } +func (dist *IDistribute) Classifier() *BioSequenceClassifier { + return dist.classifier +} + func (iterator IBioSequenceBatch) Distribute(class *BioSequenceClassifier, sizes ...int) IDistribute { batchsize := 5000 buffsize := 2 @@ -53,7 +58,7 @@ func (iterator IBioSequenceBatch) Distribute(class *BioSequenceClassifier, sizes jobDone.Wait() close(news) for _, i := range outputs { - close(i.Channel()) + i.Close() } }() @@ -67,7 +72,7 @@ func (iterator IBioSequenceBatch) Distribute(class *BioSequenceClassifier, sizes slice, ok := slices[key] if !ok { - s := GetBioSequenceSlice() + s := MakeBioSequenceSlice() slice = &s slices[key] = slice orders[key] = 0 @@ -82,9 +87,9 @@ func (iterator IBioSequenceBatch) Distribute(class *BioSequenceClassifier, sizes *slice = append(*slice, s) if len(*slice) == batchsize { - outputs[key].Channel() <- MakeBioSequenceBatch(orders[key], *slice...) + outputs[key].Push(MakeBioSequenceBatch(orders[key], *slice)) orders[key]++ - s := GetBioSequenceSlice() + s := MakeBioSequenceSlice() slices[key] = &s } } @@ -93,7 +98,7 @@ func (iterator IBioSequenceBatch) Distribute(class *BioSequenceClassifier, sizes for key, slice := range slices { if len(*slice) > 0 { - outputs[key].Channel() <- MakeBioSequenceBatch(orders[key], *slice...) + outputs[key].Push(MakeBioSequenceBatch(orders[key], *slice)) } } @@ -104,6 +109,7 @@ func (iterator IBioSequenceBatch) Distribute(class *BioSequenceClassifier, sizes return IDistribute{ outputs, news, + class, &lock} } diff --git a/pkg/obiseq/iterator.go b/pkg/obiseq/iterator.go index c92ff1d..19e23b4 100644 --- a/pkg/obiseq/iterator.go +++ b/pkg/obiseq/iterator.go @@ -2,14 +2,13 @@ package obiseq import ( "sync" - "time" ) // Private structure implementing an iterator over // bioseq.BioSequence based on a channel. type __ibiosequence__ struct { - channel chan BioSequence - current BioSequence + channel chan *BioSequence + current *BioSequence pushBack bool all_done *sync.WaitGroup buffer_size int @@ -39,10 +38,10 @@ func (iterator IBioSequence) Wait() { iterator.pointer.all_done.Wait() } -func (iterator IBioSequence) Channel() chan BioSequence { +func (iterator IBioSequence) Channel() chan *BioSequence { return iterator.pointer.channel } -func (iterator IBioSequence) PChannel() *chan BioSequence { +func (iterator IBioSequence) PChannel() *chan *BioSequence { return &(iterator.pointer.channel) } @@ -54,8 +53,8 @@ func MakeIBioSequence(sizes ...int) IBioSequence { } i := __ibiosequence__{ - channel: make(chan BioSequence, buffsize), - current: NilBioSequence, + channel: make(chan *BioSequence, buffsize), + current: nil, pushBack: false, buffer_size: buffsize, finished: false, @@ -73,7 +72,7 @@ func (iterator IBioSequence) Split() IBioSequence { i := __ibiosequence__{ channel: iterator.pointer.channel, - current: NilBioSequence, + current: nil, pushBack: false, finished: false, all_done: iterator.pointer.all_done, @@ -87,7 +86,7 @@ func (iterator IBioSequence) Split() IBioSequence { func (iterator IBioSequence) Next() bool { if iterator.IsNil() || *(iterator.pointer.pFinished) { - iterator.pointer.current = NilBioSequence + iterator.pointer.current = nil return false } @@ -103,13 +102,13 @@ func (iterator IBioSequence) Next() bool { return true } - iterator.pointer.current = NilBioSequence + iterator.pointer.current = nil *iterator.pointer.pFinished = true return false } func (iterator IBioSequence) PushBack() { - if !iterator.pointer.current.IsNil() { + if !(iterator.pointer.current == nil) { iterator.pointer.pushBack = true } } @@ -118,7 +117,7 @@ func (iterator IBioSequence) PushBack() { // currently pointed by the iterator. You have to use the // 'Next' method to move to the next entry before calling // 'Get' to retreive the following instance. -func (iterator IBioSequence) Get() BioSequence { +func (iterator IBioSequence) Get() *BioSequence { return iterator.pointer.current } @@ -156,17 +155,13 @@ func (iterator IBioSequence) IBioSequenceBatch(sizes ...int) IBioSequenceBatch { newIter.Add(1) go func() { - newIter.Wait() - for len(newIter.Channel()) > 0 { - time.Sleep(time.Millisecond) - } - close(newIter.pointer.channel) + newIter.WaitAndClose() }() go func() { for j := 0; !iterator.Finished(); j++ { batch := BioSequenceBatch{ - slice: GetBioSequenceSlice(), + slice: MakeBioSequenceSlice(), order: j} for i := 0; i < batchsize && iterator.Next(); i++ { seq := iterator.Get() @@ -280,7 +275,7 @@ func (iterator IBioSequence) Tail(n int, sizes ...int) IBioSequence { } newIter := MakeIBioSequence(buffsize) - buffseq := GetBioSequenceSlice() + buffseq := MakeBioSequenceSlice() newIter.Add(1) diff --git a/pkg/obiseq/join.go b/pkg/obiseq/join.go index 9f6d21e..f13f75e 100644 --- a/pkg/obiseq/join.go +++ b/pkg/obiseq/join.go @@ -1,6 +1,6 @@ package obiseq -func (sequence BioSequence) Join(seq2 BioSequence, inplace bool) BioSequence { +func (sequence *BioSequence) Join(seq2 *BioSequence, inplace bool) *BioSequence { if !inplace { sequence = sequence.Copy() diff --git a/pkg/obiseq/merge.go b/pkg/obiseq/merge.go index f4bd738..ac1500b 100644 --- a/pkg/obiseq/merge.go +++ b/pkg/obiseq/merge.go @@ -8,7 +8,7 @@ import ( type StatsOnValues map[string]int -func (sequence BioSequence) HasStatsOn(key string) bool { +func (sequence *BioSequence) HasStatsOn(key string) bool { if !sequence.HasAnnotation() { return false } @@ -20,7 +20,7 @@ func (sequence BioSequence) HasStatsOn(key string) bool { return ok } -func (sequence BioSequence) StatsOn(key string, na string) StatsOnValues { +func (sequence *BioSequence) StatsOn(key string, na string) StatsOnValues { mkey := "merged_" + key annotations := sequence.Annotations() istat, ok := annotations[mkey] @@ -51,9 +51,9 @@ func (sequence BioSequence) StatsOn(key string, na string) StatsOnValues { return stats } -func (sequence BioSequence) StatsPlusOne(key string, toAdd BioSequence, na string) bool { +func (sequence *BioSequence) StatsPlusOne(key string, toAdd *BioSequence, na string) bool { sval := na - stats := sequence.StatsOn(key,na) + stats := sequence.StatsOn(key, na) retval := false if toAdd.HasAnnotation() { @@ -97,7 +97,7 @@ func (stats StatsOnValues) Merge(toMerged StatsOnValues) StatsOnValues { return stats } -func (sequence BioSequence) Merge(tomerge BioSequence, na string, inplace bool, statsOn ...string) BioSequence { +func (sequence *BioSequence) Merge(tomerge *BioSequence, na string, inplace bool, statsOn ...string) *BioSequence { if !inplace { sequence = sequence.Copy() } @@ -112,11 +112,11 @@ func (sequence BioSequence) Merge(tomerge BioSequence, na string, inplace bool, for _, key := range statsOn { if tomerge.HasStatsOn(key) { - smk := sequence.StatsOn(key,na) - mmk := tomerge.StatsOn(key,na) + smk := sequence.StatsOn(key, na) + mmk := tomerge.StatsOn(key, na) smk.Merge(mmk) } else { - sequence.StatsPlusOne(key, tomerge,na) + sequence.StatsPlusOne(key, tomerge, na) } } @@ -143,24 +143,63 @@ func (sequence BioSequence) Merge(tomerge BioSequence, na string, inplace bool, return sequence } -func (sequences BioSequenceSlice) Merge(na string, statsOn ...string) BioSequenceSlice { +func (sequences BioSequenceSlice) Merge(na string, statsOn []string) *BioSequence { seq := sequences[0] + //sequences[0] = nil seq.SetQualities(nil) - seq.Annotations()["count"] = 1 - for _, toMerge := range sequences[1:] { - seq.Merge(toMerge, na, true, statsOn...) - toMerge.Recycle() + if len(sequences) == 1 { + seq.Annotations()["count"] = 1 + for _, v := range statsOn { + seq.StatsOn(v, na) + } + } else { + for k, toMerge := range sequences[1:] { + seq.Merge(toMerge, na, true, statsOn...) + toMerge.Recycle() + sequences[1+k] = nil + } } - return sequences[0:1] + sequences.Recycle() + return seq + } -func MergeSliceWorker(na string, statsOn ...string) SeqSliceWorker { +func (iterator IBioSequenceBatch) IMergeSequenceBatch(na string, statsOn []string, sizes ...int) IBioSequenceBatch { + batchsize := 100 + buffsize := iterator.BufferSize() - worker := func(sequences BioSequenceSlice) BioSequenceSlice { - return sequences.Merge(na, statsOn...) + if len(sizes) > 0 { + batchsize = sizes[0] + } + if len(sizes) > 1 { + buffsize = sizes[1] } - return worker + newIter := MakeIBioSequenceBatch(buffsize) + + newIter.Add(1) + + go func() { + newIter.WaitAndClose() + }() + + go func() { + for j := 0; !iterator.Finished(); j++ { + batch := BioSequenceBatch{ + slice: MakeBioSequenceSlice(), + order: j} + for i := 0; i < batchsize && iterator.Next(); i++ { + seqs := iterator.Get() + batch.slice = append(batch.slice, seqs.slice.Merge(na, statsOn)) + } + if batch.Length() > 0 { + newIter.Push(batch) + } + } + newIter.Done() + }() + + return newIter } diff --git a/pkg/obiseq/pool.go b/pkg/obiseq/pool.go index de8cd5f..182086d 100644 --- a/pkg/obiseq/pool.go +++ b/pkg/obiseq/pool.go @@ -14,8 +14,10 @@ var _BioSequenceByteSlicePool = sync.Pool{ } func RecycleSlice(s *[]byte) { - *s = (*s)[:0] - _BioSequenceByteSlicePool.Put(s) + if s != nil && *s != nil { + *s = (*s)[:0] + _BioSequenceByteSlicePool.Put(s) + } } func GetSlice(values ...byte) []byte { @@ -30,7 +32,7 @@ func GetSlice(values ...byte) []byte { var BioSequenceAnnotationPool = sync.Pool{ New: func() interface{} { - bs := make(Annotation, 100) + bs := make(Annotation, 5) return &bs }, } @@ -40,12 +42,16 @@ func RecycleAnnotation(a *Annotation) { for k := range *a { delete(*a, k) } - BioSequenceAnnotationPool.Put(&(a)) + BioSequenceAnnotationPool.Put(a) } } func GetAnnotation(values ...Annotation) Annotation { - a := *(BioSequenceAnnotationPool.Get().(*Annotation)) + a := Annotation(nil) + + for a == nil { + a = *(BioSequenceAnnotationPool.Get().(*Annotation)) + } if len(values) > 0 { goutils.CopyMap(a, values[0]) @@ -53,58 +59,3 @@ func GetAnnotation(values ...Annotation) Annotation { return a } - -var _BioSequenceSlicePool = sync.Pool{ - New: func() interface{} { - bs := make(BioSequenceSlice, 0, 5000) - return &bs - }, -} - -func (s *BioSequenceSlice) Recycle() { - *s = (*s)[:0] - _BioSequenceSlicePool.Put(s) -} - -func GetBioSequenceSlicePtr(values ...BioSequence) *BioSequenceSlice { - s := _BioSequenceSlicePool.Get().(*BioSequenceSlice) - - if len(values) > 0 { - *s = append(*s, values...) - } - - return s -} - -func GetBioSequenceSlice(values ...BioSequence) BioSequenceSlice { - return *GetBioSequenceSlicePtr(values...) -} - -// var __bioseq__pool__ = sync.Pool{ -// New: func() interface{} { -// var bs _BioSequence -// bs.annotations = make(Annotation, 50) -// return &bs -// }, -// } - -// func MakeEmptyBioSequence() BioSequence { -// bs := BioSequence{__bioseq__pool__.Get().(*_BioSequence)} -// return bs -// } - -// func MakeBioSequence(id string, -// sequence []byte, -// definition string) BioSequence { -// bs := MakeEmptyBioSequence() -// bs.SetId(id) -// bs.Write(sequence) -// bs.SetDefinition(definition) -// return bs -// } - -// func (sequence *BioSequence) Recycle() { -// sequence.Reset() -// __bioseq__pool__.Put(sequence.sequence) -// sequence.sequence = nil -// } diff --git a/pkg/obiseq/predicate.go b/pkg/obiseq/predicate.go index 6ff0fa1..786593f 100644 --- a/pkg/obiseq/predicate.go +++ b/pkg/obiseq/predicate.go @@ -1,9 +1,9 @@ package obiseq -type SequencePredicate func(BioSequence) bool +type SequencePredicate func(*BioSequence) bool func (predicate1 SequencePredicate) And(predicate2 SequencePredicate) SequencePredicate { - f := func(sequence BioSequence) bool { + f := func(sequence *BioSequence) bool { return predicate1(sequence) && predicate2(sequence) } @@ -11,7 +11,7 @@ func (predicate1 SequencePredicate) And(predicate2 SequencePredicate) SequencePr } func (predicate1 SequencePredicate) Or(predicate2 SequencePredicate) SequencePredicate { - f := func(sequence BioSequence) bool { + f := func(sequence *BioSequence) bool { return predicate1(sequence) || predicate2(sequence) } @@ -19,7 +19,7 @@ func (predicate1 SequencePredicate) Or(predicate2 SequencePredicate) SequencePre } func (predicate1 SequencePredicate) Xor(predicate2 SequencePredicate) SequencePredicate { - f := func(sequence BioSequence) bool { + f := func(sequence *BioSequence) bool { p1 := predicate1(sequence) p2 := predicate2(sequence) return (p1 && !p2) || (p2 && !p1) @@ -29,7 +29,7 @@ func (predicate1 SequencePredicate) Xor(predicate2 SequencePredicate) SequencePr } func (predicate1 SequencePredicate) Not() SequencePredicate { - f := func(sequence BioSequence) bool { + f := func(sequence *BioSequence) bool { return !predicate1(sequence) } @@ -38,7 +38,7 @@ func (predicate1 SequencePredicate) Not() SequencePredicate { func HasAttribute(name string) SequencePredicate { - f := func(sequence BioSequence) bool { + f := func(sequence *BioSequence) bool { if sequence.HasAnnotation() { _, ok := (sequence.Annotations())[name] return ok @@ -51,7 +51,7 @@ func HasAttribute(name string) SequencePredicate { } func MoreAbundantThan(count int) SequencePredicate { - f := func(sequence BioSequence) bool { + f := func(sequence *BioSequence) bool { return sequence.Count() > count } @@ -59,7 +59,7 @@ func MoreAbundantThan(count int) SequencePredicate { } func IsLongerOrEqualTo(length int) SequencePredicate { - f := func(sequence BioSequence) bool { + f := func(sequence *BioSequence) bool { return sequence.Length() >= length } @@ -67,7 +67,7 @@ func IsLongerOrEqualTo(length int) SequencePredicate { } func IsShorterOrEqualTo(length int) SequencePredicate { - f := func(sequence BioSequence) bool { + f := func(sequence *BioSequence) bool { return sequence.Length() <= length } diff --git a/pkg/obiseq/revcomp.go b/pkg/obiseq/revcomp.go index 5357ec7..2277d87 100644 --- a/pkg/obiseq/revcomp.go +++ b/pkg/obiseq/revcomp.go @@ -5,13 +5,13 @@ var __revcmp_dna__ = []byte(".TVGHEFCDIJMLKNOPQYSAABWXRZ#!][") // Reverse complements a DNA sequence. // If the inplace parametter is true, that operation is done in place. -func (sequence BioSequence) ReverseComplement(inplace bool) BioSequence { +func (sequence *BioSequence) ReverseComplement(inplace bool) *BioSequence { if !inplace { sequence = sequence.Copy() } - s := sequence.sequence.sequence + s := sequence.sequence for i, j := sequence.Length()-1, 0; i >= j; i-- { diff --git a/pkg/obiseq/speed.go b/pkg/obiseq/speed.go index 198b4e4..1892525 100644 --- a/pkg/obiseq/speed.go +++ b/pkg/obiseq/speed.go @@ -1,6 +1,39 @@ package obiseq -func (iterator IBioSequenceBatch) speed() IBioSequenceBatch { +import ( + "os" + + "github.com/schollz/progressbar/v3" +) + +func (iterator IBioSequenceBatch) Speed() IBioSequenceBatch { newIter := MakeIBioSequenceBatch() + + newIter.Add(1) + + go func() { + newIter.WaitAndClose() + }() + + bar := progressbar.NewOptions( + -1, + progressbar.OptionSetWriter(os.Stderr), + progressbar.OptionSetWidth(15), + progressbar.OptionShowCount(), + progressbar.OptionShowIts(), + progressbar.OptionSetDescription("[Sequence Processing]")) + + go func() { + + for iterator.Next() { + batch := iterator.Get() + l := batch.Length() + newIter.Push(batch) + bar.Add(l) + } + + newIter.Done() + }() + return newIter } diff --git a/pkg/obiseq/subseq.go b/pkg/obiseq/subseq.go index bdf353d..f7bd68d 100644 --- a/pkg/obiseq/subseq.go +++ b/pkg/obiseq/subseq.go @@ -7,32 +7,32 @@ import ( // Returns a sub sequence start from position 'from' included, // to position 'to' excluded. Coordinates start at position 0. -func (sequence BioSequence) Subsequence(from, to int, circular bool) (BioSequence, error) { +func (sequence *BioSequence) Subsequence(from, to int, circular bool) (*BioSequence, error) { if from >= to && !circular { - return NilBioSequence, errors.New("from greater than to") + return nil, errors.New("from greater than to") } if from < 0 || from >= sequence.Length() { - return NilBioSequence, errors.New("from out of bounds") + return nil, errors.New("from out of bounds") } if to <= 0 || to > sequence.Length() { - return NilBioSequence, errors.New("to out of bounds") + return nil, errors.New("to out of bounds") } - var newSeq BioSequence + var newSeq *BioSequence if from < to { - newSeq = MakeEmptyBioSequence() + newSeq = NewEmptyBioSequence() newSeq.Write(sequence.Sequence()[from:to]) if sequence.HasQualities() { newSeq.WriteQualities(sequence.Qualities()[from:to]) } - newSeq.sequence.id = fmt.Sprintf("%s_sub[%d..%d]", sequence.Id(), from+1, to) - newSeq.sequence.definition = sequence.sequence.definition + newSeq.id = fmt.Sprintf("%s_sub[%d..%d]", sequence.Id(), from+1, to) + newSeq.definition = sequence.definition } else { newSeq, _ = sequence.Subsequence(from, sequence.Length(), false) newSeq.Write(sequence.Sequence()[0:to]) @@ -44,7 +44,7 @@ func (sequence BioSequence) Subsequence(from, to int, circular bool) (BioSequenc } if len(sequence.Annotations()) > 0 { - newSeq.sequence.annotations = GetAnnotation(sequence.Annotations()) + newSeq.annotations = GetAnnotation(sequence.Annotations()) } return newSeq, nil diff --git a/pkg/obiseq/workers.go b/pkg/obiseq/workers.go index afe6ea7..2352b94 100644 --- a/pkg/obiseq/workers.go +++ b/pkg/obiseq/workers.go @@ -2,16 +2,15 @@ package obiseq import ( "log" - "time" ) -type SeqAnnotator func(BioSequence) +type SeqAnnotator func(*BioSequence) -type SeqWorker func(BioSequence) BioSequence +type SeqWorker func(*BioSequence) *BioSequence type SeqSliceWorker func(BioSequenceSlice) BioSequenceSlice func AnnotatorToSeqWorker(function SeqAnnotator) SeqWorker { - f := func(seq BioSequence) BioSequence { + f := func(seq *BioSequence) *BioSequence { function(seq) return seq } @@ -63,11 +62,7 @@ func (iterator IBioSequenceBatch) MakeIWorker(worker SeqWorker, sizes ...int) IB newIter.Add(nworkers) go func() { - newIter.Wait() - for len(newIter.Channel()) > 0 { - time.Sleep(time.Millisecond) - } - close(newIter.pointer.channel) + newIter.WaitAndClose() log.Println("End of the batch workers") }() @@ -78,7 +73,7 @@ func (iterator IBioSequenceBatch) MakeIWorker(worker SeqWorker, sizes ...int) IB for i, seq := range batch.slice { batch.slice[i] = worker(seq) } - newIter.pointer.channel <- batch + newIter.Push(batch) } newIter.Done() } @@ -109,11 +104,7 @@ func (iterator IBioSequenceBatch) MakeISliceWorker(worker SeqSliceWorker, sizes newIter.Add(nworkers) go func() { - newIter.Wait() - for len(newIter.Channel()) > 0 { - time.Sleep(time.Millisecond) - } - close(newIter.pointer.channel) + newIter.WaitAndClose() log.Println("End of the batch slice workers") }() diff --git a/pkg/obitax/issuubcladeof.go b/pkg/obitax/issuubcladeof.go index 37a05e6..fbb1849 100644 --- a/pkg/obitax/issuubcladeof.go +++ b/pkg/obitax/issuubcladeof.go @@ -33,7 +33,7 @@ func IsSubCladeOf(taxonomy Taxonomy, taxid int) obiseq.SequencePredicate { log.Fatalf("Cannot find taxon : %d (%v)", taxid, err) } - f := func(sequence obiseq.BioSequence) bool { + f := func(sequence *obiseq.BioSequence) bool { taxon, err := taxonomy.Taxon(sequence.Taxid()) return err == nil && taxon.IsSubCladeOf(parent) } diff --git a/pkg/obitools/obipairing/pairing.go b/pkg/obitools/obipairing/pairing.go index ae62aac..d5ff1ce 100644 --- a/pkg/obitools/obipairing/pairing.go +++ b/pkg/obitools/obipairing/pairing.go @@ -5,7 +5,6 @@ import ( "math" "os" "runtime" - "time" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obialign" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" @@ -49,7 +48,7 @@ func _Abs(x int) int { // Outputs: // cgatgcta..........aatcgtacga // -func JoinPairedSequence(seqA, seqB obiseq.BioSequence, inplace bool) obiseq.BioSequence { +func JoinPairedSequence(seqA, seqB *obiseq.BioSequence, inplace bool) *obiseq.BioSequence { if !inplace { seqA = seqA.Copy() @@ -64,7 +63,7 @@ func JoinPairedSequence(seqA, seqB obiseq.BioSequence, inplace bool) obiseq.BioS } if inplace { - (&seqB).Recycle() + seqB.Recycle() } return seqA @@ -104,10 +103,10 @@ func JoinPairedSequence(seqA, seqB obiseq.BioSequence, inplace bool) obiseq.BioS // An obiseq.BioSequence corresponding to the assembling of the both // input sequence. // -func AssemblePESequences(seqA, seqB obiseq.BioSequence, +func AssemblePESequences(seqA, seqB *obiseq.BioSequence, gap float64, delta, minOverlap int, minIdentity float64, withStats bool, inplace bool, - arenaAlign obialign.PEAlignArena) obiseq.BioSequence { + arenaAlign obialign.PEAlignArena) *obiseq.BioSequence { score, path := obialign.PEAlign(seqA, seqB, gap, delta, arenaAlign) cons, match := obialign.BuildQualityConsensus(seqA, seqB, path) @@ -152,8 +151,8 @@ func AssemblePESequences(seqA, seqB obiseq.BioSequence, annot["score_norm"] = scoreNorm if inplace { - (&seqA).Recycle() - (&seqB).Recycle() + seqA.Recycle() + seqB.Recycle() } } } else { @@ -222,11 +221,7 @@ func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch, newIter.Add(nworkers) go func() { - newIter.Wait() - for len(newIter.Channel()) > 0 { - time.Sleep(time.Millisecond) - } - close(newIter.Channel()) + newIter.WaitAndClose() log.Printf("End of the sequence Pairing") }() @@ -254,10 +249,10 @@ func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch, } } bar.Add(batch.Length() - processed) - newIter.Channel() <- obiseq.MakeBioSequenceBatch( + newIter.Push(obiseq.MakeBioSequenceBatch( batch.Order(), - cons..., - ) + cons, + )) } newIter.Done() }