From e8fff6477b7e1f939ad9363e09a64132088945ef Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 14 Jan 2022 23:11:36 +0100 Subject: [PATCH] Work on iterators and recycling of biosequences --- cmd/obitools/obiconvert/main.go | 4 +- cmd/obitools/obicount/main.go | 14 ++-- cmd/obitools/obipairing/main.go | 19 +++--- cmd/obitools/obipcr/main.go | 2 +- cmd/test/main.go | 5 ++ pkg/obialign/pairedendalign.go | 9 +++ pkg/obiapat/pcr.go | 8 +-- pkg/obiformats/fastseq_write_fasta.go | 29 +++++---- pkg/obiformats/fastseq_write_fastq.go | 13 ++-- pkg/obiformats/universal_write.go | 57 ++++++++++------ pkg/obioptions/options.go | 38 +++++++++-- pkg/obiseq/batchiterator.go | 22 ++++++- pkg/obiseq/biosequence.go | 6 +- pkg/obiseq/iterator.go | 33 ++++++++-- pkg/obiseq/pairedbatchiterator.go | 19 +++++- pkg/obiseq/pool.go | 6 +- pkg/obiseq/workers.go | 6 +- pkg/obitools/obiconvert/sequence_reader.go | 16 ++++- pkg/obitools/obiconvert/sequence_writer.go | 75 ++++++++++++++++++++++ pkg/obitools/obipairing/pairing.go | 56 ++++++++++------ pkg/obitools/obipcr/options.go | 17 +++++ pkg/obitools/obipcr/pcr.go | 7 +- 22 files changed, 350 insertions(+), 111 deletions(-) diff --git a/cmd/obitools/obiconvert/main.go b/cmd/obitools/obiconvert/main.go index f0e0e70..53776ca 100644 --- a/cmd/obitools/obiconvert/main.go +++ b/cmd/obitools/obiconvert/main.go @@ -13,6 +13,6 @@ func main() { _, args, _ := optionParser(os.Args) - fs, _ := obiconvert.ReadBioSequences(args...) - obiconvert.WriteBioSequences(fs) + fs, _ := obiconvert.ReadBioSequencesBatch(args...) + obiconvert.WriteBioSequencesBatch(fs,true) } diff --git a/cmd/obitools/obicount/main.go b/cmd/obitools/obicount/main.go index f945200..e99ea10 100644 --- a/cmd/obitools/obicount/main.go +++ b/cmd/obitools/obicount/main.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "os" - "runtime/trace" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obicount" @@ -21,12 +20,12 @@ func main() { // pprof.StartCPUProfile(f) // defer pprof.StopCPUProfile() - ftrace, err := os.Create("cpu.trace") - if err != nil { - log.Fatal(err) - } - trace.Start(ftrace) - defer trace.Stop() + // ftrace, err := os.Create("cpu.trace") + // if err != nil { + // log.Fatal(err) + // } + // trace.Start(ftrace) + // defer trace.Stop() optionParser := obioptions.GenerateOptionParser( obiconvert.InputOptionSet, @@ -47,6 +46,7 @@ func main() { nread += s.Count() nvariant++ nsymbol += s.Length() + (&s).Recycle() } if obicount.IsPrintingVariantCount() { diff --git a/cmd/obitools/obipairing/main.go b/cmd/obitools/obipairing/main.go index 0cbbeba..3b43fbe 100644 --- a/cmd/obitools/obipairing/main.go +++ b/cmd/obitools/obipairing/main.go @@ -1,24 +1,22 @@ package main import ( - "log" "os" - "runtime/pprof" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obipairing" ) func main() { // go tool pprof -http=":8000" ./obipairing ./cpu.pprof - f, err := os.Create("cpu.pprof") - if err != nil { - log.Fatal(err) - } - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() + // f, err := os.Create("cpu.pprof") + // if err != nil { + // log.Fatal(err) + // } + // pprof.StartCPUProfile(f) + // defer pprof.StopCPUProfile() // go tool trace cpu.trace // ftrace, err := os.Create("cpu.trace") @@ -33,6 +31,5 @@ func main() { optionParser(os.Args) pairs, _ := obipairing.IBatchPairedSequence() paired := obipairing.IAssemblePESequencesBatch(pairs, 2, 50, 20, true) - written, _ := obiformats.WriteFastqBatchToStdout(paired) - written.Destroy() + obiconvert.WriteBioSequencesBatch(paired, true) } diff --git a/cmd/obitools/obipcr/main.go b/cmd/obitools/obipcr/main.go index a070e2e..76f4c51 100644 --- a/cmd/obitools/obipcr/main.go +++ b/cmd/obitools/obipcr/main.go @@ -30,5 +30,5 @@ func main() { sequences, _ := obiconvert.ReadBioSequencesBatch(args...) amplicons, _ := obipcr.PCR(sequences) - obiconvert.WriteBioSequences(amplicons) + obiconvert.WriteBioSequencesBatch(amplicons,true) } diff --git a/cmd/test/main.go b/cmd/test/main.go index 6271107..6d45244 100644 --- a/cmd/test/main.go +++ b/cmd/test/main.go @@ -59,4 +59,9 @@ func main() { sA.ReverseComplement(true) fmt.Println(string(sA.Sequence())) fmt.Println(string(sA.Id())) + + sA.Reset() + fmt.Println(sA.Length()) + fmt.Println(sA.String()) + } diff --git a/pkg/obialign/pairedendalign.go b/pkg/obialign/pairedendalign.go index 01f2854..46e17d1 100644 --- a/pkg/obialign/pairedendalign.go +++ b/pkg/obialign/pairedendalign.go @@ -282,6 +282,11 @@ func PEAlign(seqA, seqB obiseq.BioSequence, _InitDNAScoreMatrix() } + // log.Println("==============") + // log.Println(seqA.String()) + // log.Println(seqB.String()) + // log.Println("--------------") + index := obikmer.Index4mer(seqA, &arena.pointer.fastIndex, &arena.pointer.fastBuffer) @@ -294,6 +299,10 @@ func PEAlign(seqA, seqB obiseq.BioSequence, over = seqB.Length() + shift } + // log.Println(seqA.String()) + // log.Println(seqB.String()) + // log.Printf("Shift : %d Score : %d Over : %d La : %d:%d Lb: %d:%d\n", shift, fastScore, over, seqA.Length(), len(seqA.Qualities()), seqB.Length(), len(seqB.Qualities())) + if fastScore+3 < over { if shift > 0 { startA = shift - delta diff --git a/pkg/obiapat/pcr.go b/pkg/obiapat/pcr.go index 37eea50..d780eb4 100644 --- a/pkg/obiapat/pcr.go +++ b/pkg/obiapat/pcr.go @@ -241,7 +241,7 @@ func _Pcr(seq ApatSequence, sequence obiseq.BioSequence, match, _ := sequence.Subsequence(fm[0], fm[1], opt.pointer.circular) annot["forward_match"] = match.String() - match.Destroy() + (&match).Recycle() annot["forward_error"] = erri @@ -249,7 +249,7 @@ func _Pcr(seq ApatSequence, sequence obiseq.BioSequence, match, _ = sequence.Subsequence(rm[0], rm[1], opt.pointer.circular) match = match.ReverseComplement(true) annot["reverse_match"] = match.String() - match.Destroy() + (&match).Recycle() annot["reverse_error"] = errj results = append(results, amplicon) @@ -315,14 +315,14 @@ func _Pcr(seq ApatSequence, sequence obiseq.BioSequence, match, _ := sequence.Subsequence(rm[0], rm[1], opt.pointer.circular) match.ReverseComplement(true) annot["forward_match"] = match.String() - match.Destroy() + (&match).Recycle() annot["forward_error"] = errj annot["reverse_primer"] = reverse.String() match, _ = sequence.Subsequence(fm[0], fm[1], opt.pointer.circular) annot["reverse_match"] = match.String() - match.Destroy() + (&match).Recycle() annot["reverse_error"] = erri results = append(results, amplicon) diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index b8183bc..7fa0de9 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -7,7 +7,6 @@ import ( "log" "os" "strings" - "sync" "time" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" @@ -82,26 +81,30 @@ func WriteFastaToStdout(iterator obiseq.IBioSequence, options ...WithOption) err return WriteFasta(iterator, os.Stdout, options...) } -func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) error { +func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) (obiseq.IBioSequenceBatch, error) { + opt := MakeOptions(options) + buffsize := iterator.BufferSize() newIter := obiseq.MakeIBioSequenceBatch(buffsize) - opt := MakeOptions(options) - nwriters := 4 + nwriters := opt.ParallelWorkers() chunkchan := make(chan FileChunck) - chunkwait := sync.WaitGroup{} header_format := opt.FormatFastSeqHeader() - chunkwait.Add(nwriters) + newIter.Add(nwriters) go func() { - chunkwait.Wait() + newIter.Wait() for len(chunkchan) > 0 { time.Sleep(time.Millisecond) } close(chunkchan) + for len(newIter.Channel()) > 0 { + time.Sleep(time.Millisecond) + } + close(newIter.Channel()) }() ff := func(iterator obiseq.IBioSequenceBatch) { @@ -116,9 +119,11 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options newIter.Done() } - for i := 0; i < nwriters; i++ { + log.Println("Start of the fasta file writing") + for i := 0; i < nwriters-1; i++ { go ff(iterator.Split()) } + go ff(iterator) next_to_send := 0 received := make(map[int]FileChunck, 100) @@ -142,22 +147,22 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options } }() - return nil + return newIter, nil } -func WriteFastaBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) error { +func WriteFastaBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) { return WriteFastaBatch(iterator, os.Stdout, options...) } func WriteFastaBatchToFile(iterator obiseq.IBioSequenceBatch, filename string, - options ...WithOption) error { + options ...WithOption) (obiseq.IBioSequenceBatch, error) { file, err := os.Create(filename) if err != nil { log.Fatalf("open file error: %v", err) - return err + return obiseq.NilIBioSequenceBatch, err } return WriteFastaBatch(iterator, file, options...) diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 52afee2..620bc53 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -82,11 +82,12 @@ type FileChunck struct { } func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) (obiseq.IBioSequenceBatch, error) { + opt := MakeOptions(options) + buffsize := iterator.BufferSize() newIter := obiseq.MakeIBioSequenceBatch(buffsize) - opt := MakeOptions(options) - nwriters := 4 + nwriters := opt.ParallelWorkers() chunkchan := make(chan FileChunck) @@ -110,19 +111,21 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ff := func(iterator obiseq.IBioSequenceBatch) { for iterator.Next() { batch := iterator.Get() - chunkchan <- FileChunck{ + chunk := FileChunck{ FormatFastqBatch(batch, quality, header_format), batch.Order(), } + chunkchan <- chunk newIter.Channel() <- batch } newIter.Done() } - log.Println("Start of the fastq file reading") - for i := 0; i < nwriters; i++ { + log.Println("Start of the fastq file writing") + for i := 0; i < nwriters-1; i++ { go ff(iterator.Split()) } + go ff(iterator) next_to_send := 0 received := make(map[int]FileChunck, 100) diff --git a/pkg/obiformats/universal_write.go b/pkg/obiformats/universal_write.go index dd2400b..537bea2 100644 --- a/pkg/obiformats/universal_write.go +++ b/pkg/obiformats/universal_write.go @@ -52,28 +52,45 @@ func WriteSequencesToStdout(iterator obiseq.IBioSequence, options ...WithOption) return WriteSequences(iterator, os.Stdout, options...) } -// func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch, -// file io.Writer, -// options ...WithOption) error { +func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch, + file io.Writer, + options ...WithOption) (obiseq.IBioSequenceBatch,error) { -// opts := MakeOptions(options) + var newIter obiseq.IBioSequenceBatch + var err error -// header_format := opts.FormatFastSeqHeader() -// quality := opts.QualityShift() + ok := iterator.Next() -// ok := iterator.Next() + if ok { + iterator.PushBack() + batch := iterator.Get() + if batch.Slice()[0].HasQualities() { + newIter,err = WriteFastqBatch(iterator, file, options...) + } else { + newIter,err = WriteFastaBatch(iterator, file, options...) + } -// if ok { -// batch := iterator.Get() -// if batch.Slice()[0].HasQualities() { -// file.Write() -// fmt.Fprintln(file, FormatFastq(seq, quality, header_format)) -// WriteFastq(iterator, file, options...) -// } else { -// fmt.Fprintln(file, FormatFasta(seq, header_format)) -// WriteFasta(iterator, file, options...) -// } -// } + return newIter,err + } -// return nil -// } + return obiseq.NilIBioSequenceBatch,fmt.Errorf("input iterator not ready") +} + +func WriteSequencesBatchToStdout(iterator obiseq.IBioSequenceBatch, + options ...WithOption) (obiseq.IBioSequenceBatch,error) { + return WriteSequenceBatch(iterator, os.Stdout, options...) +} + +func WriteSequencesBatchToFile(iterator obiseq.IBioSequenceBatch, + filename string, + options ...WithOption) (obiseq.IBioSequenceBatch,error) { + + file, err := os.Create(filename) + + if err != nil { + log.Fatalf("open file error: %v", err) + return obiseq.NilIBioSequenceBatch, err + } + + return WriteSequenceBatch(iterator, file, options...) +} diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index 90c6511..95e5438 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -3,18 +3,26 @@ package obioptions import ( "fmt" "os" + "runtime" "github.com/DavidGamba/go-getoptions" ) -var __debug__ = false +var _Debug = false +var _ParallelWorkers = runtime.NumCPU() - 1 +var _BufferSize = 1 +var _BatchSize = 5000 type ArgumentParser func([]string) (*getoptions.GetOpt, []string, error) func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser { options := getoptions.New() options.Bool("help", false, options.Alias("h", "?")) - options.BoolVar(&__debug__, "debug", false) + options.BoolVar(&_Debug, "debug", false) + + options.IntVar(&_ParallelWorkers, "workers", runtime.NumCPU()-1, + options.Alias("w"), + options.Description("Number of parallele threads computing the result")) for _, o := range optionset { o(options) @@ -32,15 +40,33 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser } } -// Predicate indicating if the debug mode is activated +// Predicate indicating if the debug mode is activated. func IsDebugMode() bool { - return __debug__ + return _Debug } +// ParallelWorkers returns the number of parallel workers requested by +// the command line option --workers|-w. +func ParallelWorkers() int { + return _ParallelWorkers +} + +// BufferSize returns the expeted channel buffer size for obitools +func BufferSize() int { + return _BufferSize +} + +// BatchSize returns the expeted size of the sequence batches +func BatchSize() int { + return _BatchSize +} + +// DebugOn sets the debug mode on. func DebugOn() { - __debug__ = true + _Debug = true } +// DebugOff sets the debug mode off. func DebugOff() { - __debug__ = false + _Debug = false } diff --git a/pkg/obiseq/batchiterator.go b/pkg/obiseq/batchiterator.go index 99bd524..c72f956 100644 --- a/pkg/obiseq/batchiterator.go +++ b/pkg/obiseq/batchiterator.go @@ -39,6 +39,7 @@ func (batch BioSequenceBatch) IsNil() bool { type __ibiosequencebatch__ struct { channel chan BioSequenceBatch current BioSequenceBatch + pushBack bool all_done *sync.WaitGroup buffer_size int finished bool @@ -61,9 +62,11 @@ func MakeIBioSequenceBatch(sizes ...int) IBioSequenceBatch { i := __ibiosequencebatch__{ channel: make(chan BioSequenceBatch, buffsize), current: NilBioSequenceBatch, + pushBack: false, buffer_size: buffsize, finished: false, - p_finished: nil} + p_finished: nil, + } i.p_finished = &i.finished waiting := sync.WaitGroup{} i.all_done = &waiting @@ -99,6 +102,7 @@ func (iterator IBioSequenceBatch) Split() IBioSequenceBatch { i := __ibiosequencebatch__{ channel: iterator.pointer.channel, current: NilBioSequenceBatch, + pushBack: false, all_done: iterator.pointer.all_done, buffer_size: iterator.pointer.buffer_size, finished: false, @@ -111,6 +115,12 @@ func (iterator IBioSequenceBatch) Next() bool { if *(iterator.pointer.p_finished) { return false } + + if iterator.pointer.pushBack { + iterator.pointer.pushBack = false + return true + } + next, ok := (<-iterator.pointer.channel) if ok { @@ -123,6 +133,12 @@ func (iterator IBioSequenceBatch) Next() bool { return false } +func (iterator IBioSequenceBatch) PushBack() { + if !iterator.pointer.current.IsNil() { + iterator.pointer.pushBack = true + } +} + // The 'Get' method returns the instance of BioSequenceBatch // currently pointed by the iterator. You have to use the // 'Next' method to move to the next entry before calling @@ -303,14 +319,14 @@ func (iterator IBioSequenceBatch) Rebatch(size int, sizes ...int) IBioSequenceBa return newIter } -func (iterator IBioSequenceBatch) Destroy() { +func (iterator IBioSequenceBatch) Recycle() { log.Println("Start recycling of Bioseq objects") for iterator.Next() { batch := iterator.Get() for _, seq := range batch.Slice() { - (&seq).Destroy() + (&seq).Recycle() } } log.Println("End of the recycling of Bioseq objects") diff --git a/pkg/obiseq/biosequence.go b/pkg/obiseq/biosequence.go index 39cd4fc..974395a 100644 --- a/pkg/obiseq/biosequence.go +++ b/pkg/obiseq/biosequence.go @@ -44,7 +44,7 @@ func (s BioSequence) IsNil() bool { return s.sequence == nil } -func (s BioSequence) Reset() { +func (s *BioSequence) Reset() { s.sequence.id.Reset() s.sequence.definition.Reset() s.sequence.sequence.Reset() @@ -168,6 +168,10 @@ func (s BioSequence) SetQualities(qualities Quality) { s.sequence.qualities.Write(qualities) } +func (s BioSequence) WriteQualities(data []byte) (int, error) { + return s.sequence.qualities.Write(data) +} + func (s BioSequence) Write(data []byte) (int, error) { return s.sequence.sequence.Write(data) } diff --git a/pkg/obiseq/iterator.go b/pkg/obiseq/iterator.go index a7ffb92..e7af044 100644 --- a/pkg/obiseq/iterator.go +++ b/pkg/obiseq/iterator.go @@ -10,10 +10,11 @@ import ( type __ibiosequence__ struct { channel chan BioSequence current BioSequence + pushBack bool all_done *sync.WaitGroup buffer_size int finished bool - p_finished *bool + pFinished *bool } type IBioSequence struct { @@ -55,10 +56,13 @@ func MakeIBioSequence(sizes ...int) IBioSequence { i := __ibiosequence__{ channel: make(chan BioSequence, buffsize), current: NilBioSequence, + pushBack: false, buffer_size: buffsize, finished: false, - p_finished: nil} - i.p_finished = &i.finished + pFinished: nil, + } + + i.pFinished = &i.finished waiting := sync.WaitGroup{} i.all_done = &waiting ii := IBioSequence{&i} @@ -66,23 +70,32 @@ func MakeIBioSequence(sizes ...int) IBioSequence { } func (iterator IBioSequence) Split() IBioSequence { + i := __ibiosequence__{ channel: iterator.pointer.channel, current: NilBioSequence, + pushBack: false, finished: false, all_done: iterator.pointer.all_done, buffer_size: iterator.pointer.buffer_size, - p_finished: iterator.pointer.p_finished} + pFinished: iterator.pointer.pFinished, + } + newIter := IBioSequence{&i} return newIter } func (iterator IBioSequence) Next() bool { - if iterator.IsNil() || *(iterator.pointer.p_finished) { + if iterator.IsNil() || *(iterator.pointer.pFinished) { iterator.pointer.current = NilBioSequence return false } + if iterator.pointer.pushBack { + iterator.pointer.pushBack = false + return true + } + next, ok := (<-iterator.pointer.channel) if ok { @@ -91,10 +104,16 @@ func (iterator IBioSequence) Next() bool { } iterator.pointer.current = NilBioSequence - *iterator.pointer.p_finished = true + *iterator.pointer.pFinished = true return false } +func (iterator IBioSequence) PushBack() { + if !iterator.pointer.current.IsNil() { + iterator.pointer.pushBack = true + } +} + // The 'Get' method returns the instance of BioSequence // currently pointed by the iterator. You have to use the // 'Next' method to move to the next entry before calling @@ -106,7 +125,7 @@ func (iterator IBioSequence) Get() BioSequence { // Finished returns 'true' value if no more data is available // from the iterator. func (iterator IBioSequence) Finished() bool { - return *iterator.pointer.p_finished + return *iterator.pointer.pFinished } func (iterator IBioSequence) BufferSize() int { diff --git a/pkg/obiseq/pairedbatchiterator.go b/pkg/obiseq/pairedbatchiterator.go index a03205a..973492a 100644 --- a/pkg/obiseq/pairedbatchiterator.go +++ b/pkg/obiseq/pairedbatchiterator.go @@ -55,6 +55,7 @@ func (batch PairedBioSequenceBatch) IsNil() bool { type __ipairedbiosequencebatch__ struct { channel chan PairedBioSequenceBatch current PairedBioSequenceBatch + pushBack bool all_done *sync.WaitGroup buffer_size int finished bool @@ -77,9 +78,12 @@ func MakeIPairedBioSequenceBatch(sizes ...int) IPairedBioSequenceBatch { i := __ipairedbiosequencebatch__{ channel: make(chan PairedBioSequenceBatch, buffsize), current: NilPairedBioSequenceBatch, + pushBack: false, buffer_size: buffsize, finished: false, - p_finished: nil} + p_finished: nil, + } + i.p_finished = &i.finished waiting := sync.WaitGroup{} i.all_done = &waiting @@ -115,6 +119,7 @@ func (iterator IPairedBioSequenceBatch) Split() IPairedBioSequenceBatch { i := __ipairedbiosequencebatch__{ channel: iterator.pointer.channel, current: NilPairedBioSequenceBatch, + pushBack: false, all_done: iterator.pointer.all_done, buffer_size: iterator.pointer.buffer_size, finished: false, @@ -127,6 +132,12 @@ func (iterator IPairedBioSequenceBatch) Next() bool { if *(iterator.pointer.p_finished) { return false } + + if iterator.pointer.pushBack { + iterator.pointer.pushBack = false + return true + } + next, ok := (<-iterator.pointer.channel) if ok { @@ -139,6 +150,12 @@ func (iterator IPairedBioSequenceBatch) Next() bool { return false } +func (iterator IPairedBioSequenceBatch) PushBack() { + if !iterator.pointer.current.IsNil() { + iterator.pointer.pushBack = true + } +} + // The 'Get' method returns the instance of BioSequenceBatch // currently pointed by the iterator. You have to use the // 'Next' method to move to the next entry before calling diff --git a/pkg/obiseq/pool.go b/pkg/obiseq/pool.go index ed50b01..a183d50 100644 --- a/pkg/obiseq/pool.go +++ b/pkg/obiseq/pool.go @@ -14,7 +14,6 @@ var __bioseq__pool__ = sync.Pool{ func MakeEmptyBioSequence() BioSequence { bs := BioSequence{__bioseq__pool__.Get().(*__sequence__)} - bs.Reset() return bs } @@ -23,12 +22,13 @@ func MakeBioSequence(id string, definition string) BioSequence { bs := MakeEmptyBioSequence() bs.SetId(id) - bs.SetSequence(sequence) + bs.Write(sequence) bs.SetDefinition(definition) return bs } -func (sequence *BioSequence) Destroy() { +func (sequence *BioSequence) Recycle() { + sequence.Reset() __bioseq__pool__.Put(sequence.sequence) sequence.sequence = nil } diff --git a/pkg/obiseq/workers.go b/pkg/obiseq/workers.go index 3b17df5..81f1685 100644 --- a/pkg/obiseq/workers.go +++ b/pkg/obiseq/workers.go @@ -84,9 +84,10 @@ func (iterator IBioSequenceBatch) MakeIWorker(worker SeqWorker, sizes ...int) IB } log.Println("Start of the batch workers") - for i := 0; i < nworkers; i++ { + for i := 0; i < nworkers-1; i++ { go f(iterator.Split()) } + go f(iterator) return newIter } @@ -126,9 +127,10 @@ func (iterator IBioSequenceBatch) MakeISliceWorker(worker SeqSliceWorker, sizes } log.Println("Start of the batch slice workers") - for i := 0; i < nworkers; i++ { + for i := 0; i < nworkers - 1; i++ { go f(iterator.Split()) } + go f(iterator) return newIter } diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index b76ea05..f922c39 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -7,10 +7,11 @@ import ( "strings" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) -func __expand_list_of_files__(check_ext bool, filenames ...string) ([]string, error) { +func _ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { var err error list_of_files := make([]string, 0, 100) for _, fn := range filenames { @@ -32,7 +33,7 @@ func __expand_list_of_files__(check_ext bool, filenames ...string) ([]string, er if info.IsDir() { if path != fn { - subdir, e := __expand_list_of_files__(true, path) + subdir, e := _ExpandListOfFiles(true, path) if e != nil { return e } @@ -80,6 +81,15 @@ func ReadBioSequencesBatch(filenames ...string) (obiseq.IBioSequenceBatch, error opts = append(opts, obiformats.OptionsFastSeqHeaderParser(obiformats.ParseGuessedFastSeqHeader)) } + nworkers := obioptions.ParallelWorkers() / 4 + if nworkers < 2 { + nworkers = 2 + } + + opts = append(opts, obiformats.OptionsParallelWorkers(nworkers)) + opts = append(opts, obiformats.OptionsBufferSize(obioptions.BufferSize())) + opts = append(opts, obiformats.OptionsBatchSize(obioptions.BatchSize())) + opts = append(opts, obiformats.OptionsQualityShift(InputQualityShift())) if len(filenames) == 0 { @@ -94,7 +104,7 @@ func ReadBioSequencesBatch(filenames ...string) (obiseq.IBioSequenceBatch, error } } else { - list_of_files, err := __expand_list_of_files__(false, filenames...) + list_of_files, err := _ExpandListOfFiles(false, filenames...) if err != nil { return obiseq.NilIBioSequenceBatch, err } diff --git a/pkg/obitools/obiconvert/sequence_writer.go b/pkg/obitools/obiconvert/sequence_writer.go index 610a0a4..ccc6612 100644 --- a/pkg/obitools/obiconvert/sequence_writer.go +++ b/pkg/obitools/obiconvert/sequence_writer.go @@ -4,6 +4,7 @@ import ( "log" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -23,6 +24,15 @@ func WriteBioSequences(iterator obiseq.IBioSequence, filenames ...string) error opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader)) } + nworkers := obioptions.ParallelWorkers() / 4 + if nworkers < 2 { + nworkers = 2 + } + + opts = append(opts, obiformats.OptionsParallelWorkers(nworkers)) + opts = append(opts, obiformats.OptionsBufferSize(obioptions.BufferSize())) + opts = append(opts, obiformats.OptionsBatchSize(obioptions.BatchSize())) + opts = append(opts, obiformats.OptionsQualityShift(OutputQualityShift())) var err error @@ -54,3 +64,68 @@ func WriteBioSequences(iterator obiseq.IBioSequence, filenames ...string) error return nil } + +func WriteBioSequencesBatch(iterator obiseq.IBioSequenceBatch, + terminalAction bool, filenames ...string) (obiseq.IBioSequenceBatch, error) { + + var newIter obiseq.IBioSequenceBatch + + opts := make([]obiformats.WithOption, 0, 10) + + switch OutputFastHeaderFormat() { + case "json": + log.Println("On output use JSON headers") + opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader)) + case "obi": + log.Println("On output use OBI headers") + opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqOBIHeader)) + default: + log.Println("On output use JSON headers") + opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader)) + } + + nworkers := obioptions.ParallelWorkers() / 4 + if nworkers < 2 { + nworkers = 2 + } + + opts = append(opts, obiformats.OptionsParallelWorkers(nworkers)) + opts = append(opts, obiformats.OptionsBufferSize(obioptions.BufferSize())) + opts = append(opts, obiformats.OptionsBatchSize(obioptions.BatchSize())) + + opts = append(opts, obiformats.OptionsQualityShift(OutputQualityShift())) + + var err error + + if len(filenames) == 0 { + switch OutputFormat() { + case "fastq": + newIter, err = obiformats.WriteFastqBatchToStdout(iterator, opts...) + case "fasta": + newIter, err = obiformats.WriteFastaBatchToStdout(iterator, opts...) + default: + newIter, err = obiformats.WriteSequencesBatchToStdout(iterator, opts...) + } + } else { + switch OutputFormat() { + case "fastq": + newIter, err = obiformats.WriteFastqBatchToFile(iterator, filenames[0], opts...) + case "fasta": + newIter, err = obiformats.WriteFastaBatchToFile(iterator, filenames[0], opts...) + default: + newIter, err = obiformats.WriteSequencesBatchToFile(iterator, filenames[0], opts...) + } + } + + if err != nil { + log.Fatalf("Write file error: %v", err) + return obiseq.NilIBioSequenceBatch, err + } + + if terminalAction { + newIter.Recycle() + return obiseq.NilIBioSequenceBatch, nil + } + + return newIter, nil +} diff --git a/pkg/obitools/obipairing/pairing.go b/pkg/obitools/obipairing/pairing.go index 0cca250..53f24bd 100644 --- a/pkg/obitools/obipairing/pairing.go +++ b/pkg/obitools/obipairing/pairing.go @@ -4,6 +4,7 @@ import ( "log" "math" "os" + "runtime" "time" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obialign" @@ -11,34 +12,40 @@ import ( "github.com/schollz/progressbar/v3" ) -func __abs__(x int) int { +func _Abs(x int) int { if x < 0 { return -x } return x } -func JoinPairedSequence(seqA, seqB obiseq.BioSequence) obiseq.BioSequence { - js := make([]byte, seqA.Length(), seqA.Length()+seqB.Length()+10) - jq := make([]byte, seqA.Length(), seqA.Length()+seqB.Length()+10) +func JoinPairedSequence(seqA, seqB obiseq.BioSequence, inplace bool) obiseq.BioSequence { - copy(js, seqA.Sequence()) - copy(jq, seqA.Qualities()) + if !inplace { + seqA = seqA.Copy() + } - js = append(js, '.', '.', '.', '.', '.', '.', '.', '.', '.', '.') - jq = append(jq, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + seqA.WriteString("..........") + seqA.Write(seqB.Sequence()) - js = append(js, seqB.Sequence()...) - jq = append(jq, seqB.Qualities()...) + seqA.WriteQualities(obiseq.Quality{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) + seqA.WriteQualities(seqB.Qualities()) - rep := obiseq.MakeBioSequence(seqA.Id(), js, seqA.Definition()) - rep.SetQualities(jq) - - return rep + return seqA } +// AssemblePESequences assembles two paired sequences following +// the obipairing strategy implemented in obialign.PEAlign using +// the gap and delta parametters. +// If the length of the overlap between both sequences is less than +// overlap_min, The alignment is substituted by a simple pasting +// of the sequences with a strech of 10 dots in between them. +// the quality of the dots is set to 0. +// If the inplace parameter is set to true, the seqA and seqB are +// destroyed during the assembling process and cannot be reuse later on. func AssemblePESequences(seqA, seqB obiseq.BioSequence, gap, delta, overlap_min int, with_stats bool, + inplace bool, arena_align obialign.PEAlignArena, arena_cons obialign.BuildAlignArena, arena_qual obialign.BuildAlignArena) obiseq.BioSequence { @@ -53,7 +60,7 @@ func AssemblePESequences(seqA, seqB obiseq.BioSequence, right = path[len(path)-2] } lcons := cons.Length() - ali_length := lcons - __abs__(left) - __abs__(right) + ali_length := lcons - _Abs(left) - _Abs(right) if ali_length >= overlap_min { if with_stats { @@ -85,14 +92,22 @@ func AssemblePESequences(seqA, seqB obiseq.BioSequence, annot["seq_ab_match"] = match annot["score_norm"] = score_norm + if inplace { + (&seqA).Recycle() + (&seqB).Recycle() + } } } else { - cons = JoinPairedSequence(seqA, seqB) + cons = JoinPairedSequence(seqA, seqB, inplace) if with_stats { annot := cons.Annotations() annot["mode"] = "join" } + + if inplace { + (&seqB).Recycle() + } } return cons @@ -101,7 +116,7 @@ func AssemblePESequences(seqA, seqB obiseq.BioSequence, func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch, gap, delta, overlap_min int, with_stats bool, sizes ...int) obiseq.IBioSequenceBatch { - nworkers := 7 + nworkers := runtime.NumCPU() - 1 buffsize := iterator.BufferSize() if len(sizes) > 0 { @@ -148,13 +163,11 @@ func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch, processed := 0 for i, A := range batch.Forward() { B := batch.Reverse()[i] - cons[i] = AssemblePESequences(A, B, 2, 5, 20, true, arena, barena1, barena2) + cons[i] = AssemblePESequences(A, B, 2, 5, 20, true, true, arena, barena1, barena2) if i%59 == 0 { bar.Add(59) processed += 59 } - A.Destroy() - B.Destroy() } bar.Add(batch.Length() - processed) newIter.Channel() <- obiseq.MakeBioSequenceBatch( @@ -169,9 +182,10 @@ func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch, log.Printf("Start of the sequence Pairing") - for i := 0; i < nworkers; i++ { + for i := 0; i < nworkers-1; i++ { go f(iterator.Split(), i) } + go f(iterator, nworkers-1) return newIter diff --git a/pkg/obitools/obipcr/options.go b/pkg/obitools/obipcr/options.go index 367def0..d81f3ff 100644 --- a/pkg/obitools/obipcr/options.go +++ b/pkg/obitools/obipcr/options.go @@ -15,6 +15,8 @@ var _AllowedMismatch = 0 var _MinimumLength = 0 var _MaximumLength = -1 +// PCROptionSet adds to a command line option set every options +// needed by the PCR algorithm. func PCROptionSet(options *getoptions.GetOpt) { options.BoolVar(&_Circular, "circular", false, options.Alias("c"), @@ -40,11 +42,15 @@ func PCROptionSet(options *getoptions.GetOpt) { options.Description("Maximum length of the barcode (primers excluded).")) } +// OptionSet adds to the basic option set every options declared for +// the obipcr command func OptionSet(options *getoptions.GetOpt) { obiconvert.OptionSet(options) PCROptionSet(options) } +// ForwardPrimer returns the sequence of the forward primer as indicated by the +// --forward command line option func ForwardPrimer() string { pattern, err := obiapat.MakeApatPattern(_ForwardPrimer, _AllowedMismatch) @@ -57,6 +63,8 @@ func ForwardPrimer() string { return _ForwardPrimer } +// ReversePrimer returns the sequence of the reverse primer as indicated by the +// --reverse command line option func ReversePrimer() string { pattern, err := obiapat.MakeApatPattern(_ReversePrimer, _AllowedMismatch) @@ -69,18 +77,27 @@ func ReversePrimer() string { return _ReversePrimer } +// AllowedMismatch returns the allowed mistmatch count between each +// primer and the sequences as indicated by the +// --allowed-mismatches|-e command line option func AllowedMismatch() int { return _AllowedMismatch } +// Circular returns the considered sequence topology as indicated by the +// --circular|-c command line option func Circular() bool { return _Circular } +// MinLength returns the amplicon minimum length as indicated by the +// --min-length|-l command line option func MinLength() int { return _MinimumLength } +// MaxLength returns the amplicon maximum length as indicated by the +// --max-length|-L command line option func MaxLength() int { return _MaximumLength } diff --git a/pkg/obitools/obipcr/pcr.go b/pkg/obitools/obipcr/pcr.go index 48a40b2..02b3285 100644 --- a/pkg/obitools/obipcr/pcr.go +++ b/pkg/obitools/obipcr/pcr.go @@ -5,7 +5,10 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) -func PCR(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSequence, error) { +// PCR iterates over sequences provided by a obiseq.IBioSequenceBatch +// and returns an other obiseq.IBioSequenceBatch distributing +// obiseq.BioSequenceBatch containing the selected amplicon sequences. +func PCR(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSequenceBatch, error) { forward := ForwardPrimer() reverse := ReversePrimer() @@ -28,5 +31,5 @@ func PCR(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSequence, error) { worker := obiapat.PCRSliceWorker(forward, reverse, opts...) - return iterator.MakeISliceWorker(worker).IBioSequence(), nil + return iterator.MakeISliceWorker(worker), nil }