From 38e4655f38656b1975f02e45af6e9325a059177a Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 7 Feb 2022 11:51:35 +0100 Subject: [PATCH] Correct for a strange bug... --- cmd/obitools/obipairing/main.go | 3 +- pkg/obiformats/binary_write.go | 153 +++++++++++++++++++++++++++++ pkg/obingslibrary/worker.go | 9 +- pkg/obioptions/options.go | 1 - pkg/obitools/obipairing/options.go | 2 +- pkg/obitools/obipairing/pairing.go | 5 +- pkg/obitools/obipcr/pcr.go | 4 +- 7 files changed, 163 insertions(+), 14 deletions(-) create mode 100644 pkg/obiformats/binary_write.go diff --git a/cmd/obitools/obipairing/main.go b/cmd/obitools/obipairing/main.go index 90eea2d..ae10040 100644 --- a/cmd/obitools/obipairing/main.go +++ b/cmd/obitools/obipairing/main.go @@ -7,7 +7,6 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obimultiplex" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obipairing" ) @@ -29,7 +28,7 @@ func main() { trace.Start(ftrace) defer trace.Stop() - optionParser := obioptions.GenerateOptionParser(obimultiplex.OptionSet) + optionParser := obioptions.GenerateOptionParser(obipairing.OptionSet) optionParser(os.Args) pairs, _ := obipairing.IBatchPairedSequence() diff --git a/pkg/obiformats/binary_write.go b/pkg/obiformats/binary_write.go new file mode 100644 index 0000000..d61c2d8 --- /dev/null +++ b/pkg/obiformats/binary_write.go @@ -0,0 +1,153 @@ +package obiformats + +import ( + "io" + "log" + "os" + "time" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "github.com/renproject/surge" +) + +func WriteBinary(iterator obiseq.IBioSequence, file io.Writer) error { + singleseq := make(obiseq.BioSequenceSlice, 1) + blob := make([]byte, 0, 1024) + for iterator.Next() { + singleseq[0] = iterator.Get() + blobsize := singleseq.SizeHint() + if blobsize > cap(blob) { + blob = make([]byte, 0, blobsize*2+8) + } + _, _, err := surge.MarshalI64(int64(blobsize), blob, 8) + if err != nil { + return err + } + data := blob[8 : 8+blobsize] + _, _, err = singleseq.Marshal(data, blobsize) + if err != nil { + return err + } + + file.Write(blob[0 : 8+blobsize]) + } + + return nil +} + +func WriteBinaryToFile(iterator obiseq.IBioSequence, + filename string) error { + + file, err := os.Create(filename) + + if err != nil { + log.Fatalf("open file error: %v", err) + return err + } + + return WriteBinary(iterator, file) +} + +func WriteBinaryToStdout(iterator obiseq.IBioSequence) error { + return WriteBinary(iterator, os.Stdout) +} + +func WriteBinaryBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) (obiseq.IBioSequenceBatch, error) { + opt := MakeOptions(options) + + buffsize := iterator.BufferSize() + newIter := obiseq.MakeIBioSequenceBatch(buffsize) + + nwriters := opt.ParallelWorkers() + + chunkchan := make(chan FileChunck) + + newIter.Add(nwriters) + + go func() { + newIter.Wait() + for len(chunkchan) > 0 { + time.Sleep(time.Millisecond) + } + close(chunkchan) + for len(newIter.Channel()) > 0 { + time.Sleep(time.Millisecond) + } + close(newIter.Channel()) + }() + + ff := func(iterator obiseq.IBioSequenceBatch) { + blob := make([]byte, 0, 10240) + for iterator.Next() { + batch := iterator.Get() + blobsize := batch.Slice().SizeHint() + if blobsize > cap(blob) { + blob = make([]byte, 0, blobsize*2+8) + } + _, _, err := surge.MarshalI64(int64(blobsize), blob, 8) + if err != nil { + log.Fatalf("error in reading binary file %v\n", err) + } + data := blob[8 : 8+blobsize] + _, _, err = batch.Slice().Marshal(data, blobsize) + if err != nil { + log.Fatalf("error in reading binary file %v\n", err) + } + + chunkchan <- FileChunck{ + data, + batch.Order(), + } + newIter.Channel() <- batch + } + newIter.Done() + } + + log.Println("Start of the binary file writing") + go ff(iterator) + for i := 0; i < nwriters-1; i++ { + go ff(iterator.Split()) + } + + next_to_send := 0 + received := make(map[int]FileChunck, 100) + + go func() { + for chunk := range chunkchan { + if chunk.order == next_to_send { + file.Write(chunk.text) + next_to_send++ + chunk, ok := received[next_to_send] + for ok { + file.Write(chunk.text) + delete(received, next_to_send) + next_to_send++ + chunk, ok = received[next_to_send] + } + } else { + received[chunk.order] = chunk + } + + } + }() + + return newIter, nil +} + +func WriteBinaryBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) { + return WriteBinaryBatch(iterator, os.Stdout, options...) +} + +func WriteBinaryBatchToFile(iterator obiseq.IBioSequenceBatch, + filename string, + options ...WithOption) (obiseq.IBioSequenceBatch, error) { + + file, err := os.Create(filename) + + if err != nil { + log.Fatalf("open file error: %v", err) + return obiseq.NilIBioSequenceBatch, err + } + + return WriteBinaryBatch(iterator, file, options...) +} diff --git a/pkg/obingslibrary/worker.go b/pkg/obingslibrary/worker.go index 76c3568..aa1ed89 100644 --- a/pkg/obingslibrary/worker.go +++ b/pkg/obingslibrary/worker.go @@ -143,11 +143,11 @@ func MakeOptions(setters []WithOption) Options { func _ExtractBarcodeSlice(ngslibrary NGSLibrary, sequences obiseq.BioSequenceSlice, options Options) obiseq.BioSequenceSlice { - newSlice := make(obiseq.BioSequenceSlice,0,len(sequences)) - + newSlice := make(obiseq.BioSequenceSlice, 0, len(sequences)) + for _, seq := range sequences { - s, err := ngslibrary.ExtractBarcode(seq,true) - if err==nil || ! options.pointer.discardErrors { + s, err := ngslibrary.ExtractBarcode(seq, true) + if err == nil || !options.pointer.discardErrors { newSlice = append(newSlice, s) } } @@ -179,4 +179,3 @@ func ExtractBarcodeSliceWorker(ngslibrary NGSLibrary, return worker } - diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index bba7f6d..54d8d99 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -27,7 +27,6 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser options.Description("Number of parallele threads computing the result")) options.IntVar(&_MaxAllowedCPU, "max-cpu", _MaxAllowedCPU, - options.Alias("w"), options.Description("Number of parallele threads computing the result")) for _, o := range optionset { diff --git a/pkg/obitools/obipairing/options.go b/pkg/obitools/obipairing/options.go index 0a70b21..c16e4d4 100644 --- a/pkg/obitools/obipairing/options.go +++ b/pkg/obitools/obipairing/options.go @@ -30,7 +30,7 @@ func PairingOptionSet(options *getoptions.GetOpt) { options.Alias("O"), options.Description("Minimum ovelap between both the reads to consider the aligment")) options.Float64Var(&_MinIdentity, "min-identity", _MinIdentity, - options.Alias("O"), + options.Alias("X"), options.Description("Minimum identity between ovelaped regions of the reads to consider the aligment")) options.Float64Var(&_GapPenality, "gap-penality", _GapPenality, options.Alias("G"), diff --git a/pkg/obitools/obipairing/pairing.go b/pkg/obitools/obipairing/pairing.go index fac1553..ae62aac 100644 --- a/pkg/obitools/obipairing/pairing.go +++ b/pkg/obitools/obipairing/pairing.go @@ -105,7 +105,7 @@ func JoinPairedSequence(seqA, seqB obiseq.BioSequence, inplace bool) obiseq.BioS // input sequence. // func AssemblePESequences(seqA, seqB obiseq.BioSequence, - gap float64, delta, minOverlap int, minIdentity float64,withStats bool, + gap float64, delta, minOverlap int, minIdentity float64, withStats bool, inplace bool, arenaAlign obialign.PEAlignArena) obiseq.BioSequence { @@ -119,7 +119,7 @@ func AssemblePESequences(seqA, seqB obiseq.BioSequence, } lcons := cons.Length() aliLength := lcons - _Abs(left) - _Abs(right) - identity := float64(match)/float64(aliLength) + identity := float64(match) / float64(aliLength) if aliLength >= minOverlap && identity >= minIdentity { if withStats { @@ -268,7 +268,6 @@ func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch, go f(iterator.Split(), i) } go f(iterator, nworkers-1) - return newIter } diff --git a/pkg/obitools/obipcr/pcr.go b/pkg/obitools/obipcr/pcr.go index 575973b..2dd6b41 100644 --- a/pkg/obitools/obipcr/pcr.go +++ b/pkg/obitools/obipcr/pcr.go @@ -14,11 +14,11 @@ func PCR(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSequenceBatch, error) { opts = append(opts, obiapat.OptionForwardPrimer( - ForwardPrimer(), + ForwardPrimer(), AllowedMismatch(), ), obiapat.OptionReversePrimer( - ReversePrimer(), + ReversePrimer(), AllowedMismatch(), ), )