From e8c55a2b6bdae63516a7f752d52a5a04645682d2 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 13 Oct 2023 21:52:57 +0200 Subject: [PATCH] optimize sequence readers and patch a bug in the format guesser Former-commit-id: 9dce1e96c57ae9a88c26fac5c8e1bdcdc2c0c7a5 --- pkg/obiapat/pcr.go | 124 ++++++++++++++++++++++--------- pkg/obiformats/embl_read.go | 8 +- pkg/obiformats/fastaseq_read.go | 16 +++- pkg/obiformats/fastqseq_read.go | 14 +++- pkg/obiformats/universal_read.go | 16 +++- pkg/obitools/obipcr/options.go | 8 ++ pkg/obitools/obipcr/pcr.go | 1 + 7 files changed, 141 insertions(+), 46 deletions(-) diff --git a/pkg/obiapat/pcr.go b/pkg/obiapat/pcr.go index 3585037..fef27e5 100644 --- a/pkg/obiapat/pcr.go +++ b/pkg/obiapat/pcr.go @@ -14,6 +14,7 @@ type _Options struct { forwardError int reverseError int extension int + fullExtension bool batchSize int parallelWorkers int forward ApatPattern @@ -41,6 +42,10 @@ func (options Options) Extension() int { return options.pointer.extension } +func (options Options) OnlyFullExtension() bool { + return options.pointer.fullExtension +} + // MinLength method returns minimum length of // the searched amplicon (length of the primers // excluded) @@ -96,6 +101,7 @@ func MakeOptions(setters []WithOption) Options { forwardError: 0, reverseError: 0, extension: -1, + fullExtension: false, circular: false, parallelWorkers: 4, batchSize: 100, @@ -172,6 +178,14 @@ func OptionWithExtension(extension int) WithOption { return f } +func OptionOnlyFullExtension(full bool) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.fullExtension = full + }) + + return f +} + // OptionForwardError sets the number of // error allowed when matching the forward // primer. @@ -285,31 +299,51 @@ func _Pcr(seq ApatSequence, from = fm[1] to = rm[0] } - amplicon, _ := seq.pointer.reference.Subsequence(from, to, opt.pointer.circular) - log.Debugf("seq length : %d capacity : %d", amplicon.Len(), cap(amplicon.Sequence())) - annot := amplicon.Annotations() - obiutils.MustFillMap(annot, seq.pointer.reference.Annotations()) - annot["forward_primer"] = forward.String() + if opt.HasExtension() && !opt.OnlyFullExtension() && !opt.Circular() { + if from < 0 { + from = 0 + } + if to > seq.Len() { + to = seq.Len() + } + } - match, _ := seq.pointer.reference.Subsequence(fm[0], fm[1], opt.pointer.circular) - annot["forward_match"] = match.String() - match.Recycle() + if (opt.HasExtension() && ((from >= 0 && to <= seq.Len()) || opt.Circular())) || + !opt.HasExtension() { - annot["forward_error"] = erri + amplicon, error := seq.pointer.reference.Subsequence(from, to, opt.Circular()) - annot["reverse_primer"] = reverse.String() - match, _ = seq.pointer.reference.Subsequence(rm[0], rm[1], opt.pointer.circular) - match = match.ReverseComplement(true) - annot["reverse_match"] = match.String() - match.Recycle() + if error != nil { + log.Fatalf("error : %v\n", error) + } - annot["reverse_error"] = errj - annot["direction"] = "forward" + log.Debugf("seq length : %d capacity : %d", amplicon.Len(), cap(amplicon.Sequence())) + annot := amplicon.Annotations() + obiutils.MustFillMap(annot, seq.pointer.reference.Annotations()) - // log.Debugf("amplicon sequence capacity : %d", cap(amplicon.Sequence())) + annot["forward_primer"] = forward.String() - results = append(results, amplicon) + match, _ := seq.pointer.reference.Subsequence(fm[0], fm[1], opt.pointer.circular) + annot["forward_match"] = match.String() + match.Recycle() + + annot["forward_error"] = erri + + annot["reverse_primer"] = reverse.String() + match, _ = seq.pointer.reference.Subsequence(rm[0], rm[1], opt.pointer.circular) + match = match.ReverseComplement(true) + annot["reverse_match"] = match.String() + match.Recycle() + + annot["reverse_error"] = errj + annot["direction"] = "forward" + + // log.Debugf("amplicon sequence capacity : %d", cap(amplicon.Sequence())) + + results = append(results, amplicon) + + } } } } @@ -370,30 +404,48 @@ func _Pcr(seq ApatSequence, from = fm[1] to = rm[0] } - amplicon, _ := seq.pointer.reference.Subsequence(from, to, opt.pointer.circular) - amplicon = amplicon.ReverseComplement(true) - annot := amplicon.Annotations() - obiutils.MustFillMap(annot, seq.pointer.reference.Annotations()) - annot["forward_primer"] = forward.String() + if opt.HasExtension() && !opt.OnlyFullExtension() && !opt.Circular() { + if from < 0 { + from = 0 + } + if to > seq.Len() { + to = seq.Len() + } + } - match, _ := seq.pointer.reference.Subsequence(rm[0], rm[1], opt.pointer.circular) - match.ReverseComplement(true) - annot["forward_match"] = match.String() - match.Recycle() + if (opt.HasExtension() && ((from >= 0 && to <= seq.Len()) || opt.Circular())) || + !opt.HasExtension() { + amplicon, error := seq.pointer.reference.Subsequence(from, to, opt.pointer.circular) - annot["forward_error"] = errj + if error != nil { + log.Fatalf("error : %v\n", error) + } - annot["reverse_primer"] = reverse.String() - match, _ = seq.pointer.reference.Subsequence(fm[0], fm[1], opt.pointer.circular) - annot["reverse_match"] = match.String() - match.Recycle() + amplicon = amplicon.ReverseComplement(true) - annot["reverse_error"] = erri - annot["direction"] = "reverse" + annot := amplicon.Annotations() + obiutils.MustFillMap(annot, seq.pointer.reference.Annotations()) + annot["forward_primer"] = forward.String() - results = append(results, amplicon) - // log.Debugf("amplicon sequence capacity : %d", cap(amplicon.Sequence())) + match, _ := seq.pointer.reference.Subsequence(rm[0], rm[1], opt.pointer.circular) + match.ReverseComplement(true) + annot["forward_match"] = match.String() + match.Recycle() + + annot["forward_error"] = errj + + annot["reverse_primer"] = reverse.String() + match, _ = seq.pointer.reference.Subsequence(fm[0], fm[1], opt.pointer.circular) + annot["reverse_match"] = match.String() + match.Recycle() + + annot["reverse_error"] = erri + annot["direction"] = "reverse" + + results = append(results, amplicon) + // log.Debugf("amplicon sequence capacity : %d", cap(amplicon.Sequence())) + } } } } diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index 3b9f7c2..947a3fe 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -192,8 +192,10 @@ func _ReadFlatFileChunk(reader io.Reader, readers chan _FileChunk) { for err == nil { // Read from the reader until the buffer is full or the end of the file is reached - for ; err == nil && l < len(buff); l += size { - size, err = reader.Read(buff[l:]) + l, err = io.ReadFull(reader, buff) + + if err == io.ErrUnexpectedEOF { + err = nil } // Create an extended buffer to read from if the end of the last entry is not found in the current buffer @@ -205,7 +207,7 @@ func _ReadFlatFileChunk(reader io.Reader, readers chan _FileChunk) { // Read from the reader in 1 MB increments until the end of the last entry is found for end = _EndOfLastEntry(buff); err == nil && end < 0; end = _EndOfLastEntry(extbuff[:size]) { ic++ - size, err = reader.Read(extbuff) + size, err = io.ReadFull(reader, extbuff) buff = append(buff, extbuff[:size]...) } diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index b41c1e8..34b439e 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -92,7 +92,12 @@ func FastaChunkReader(r io.Reader, size int, cutHead bool) (chan FastxChunk, err out := make(chan FastxChunk) buff := make([]byte, size) - n, err := r.Read(buff) + n, err := io.ReadFull(r, buff) + + if err == io.ErrUnexpectedEOF { + err = nil + } + if n > 0 && err == nil { if n < size { buff = buff[:n] @@ -128,13 +133,20 @@ func FastaChunkReader(r io.Reader, size int, cutHead bool) (chan FastxChunk, err index: idx, } idx++ + } else { + size = size * 2 } buff = slices.Grow(buff[:0], size)[0:size] - n, err = r.Read(buff) + n, err = io.ReadFull(r, buff) if n < size { buff = buff[:n] } + + if err == io.ErrUnexpectedEOF { + err = nil + } + // fmt.Printf("n = %d, err = %v\n", n, err) } diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index e500f21..4b81d05 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -99,7 +99,11 @@ func FastqChunkReader(r io.Reader, size int) (chan FastxChunk, error) { out := make(chan FastxChunk) buff := make([]byte, size) - n, err := r.Read(buff) + n, err := io.ReadFull(r, buff) + + if err == io.ErrUnexpectedEOF { + err = nil + } if n > 0 && err == nil { if n < size { @@ -130,13 +134,19 @@ func FastqChunkReader(r io.Reader, size int) (chan FastxChunk, error) { index: idx, } idx++ + } else { + size = size * 2 } buff = slices.Grow(buff[:0], size)[0:size] - n, err = r.Read(buff) + n, err = io.ReadFull(r, buff) if n < size { buff = buff[:n] } + + if err == io.ErrUnexpectedEOF { + err = nil + } // fmt.Printf("n = %d, err = %v\n", n, err) } diff --git a/pkg/obiformats/universal_read.go b/pkg/obiformats/universal_read.go index bd7660b..e2d2f5f 100644 --- a/pkg/obiformats/universal_read.go +++ b/pkg/obiformats/universal_read.go @@ -69,11 +69,17 @@ func OBIMimeTypeGuesser(stream io.Reader) (*mimetype.MIME, io.Reader, error) { mimetype.Lookup("text/plain").Extend(genbankDetector, "text/genbank", ".seq") mimetype.Lookup("text/plain").Extend(emblDetector, "text/embl", ".dat") + mimetype.Lookup("application/octet-stream").Extend(fastaDetector, "text/fasta", ".fasta") + mimetype.Lookup("application/octet-stream").Extend(fastqDetector, "text/fastq", ".fastq") + mimetype.Lookup("application/octet-stream").Extend(ecoPCR2Detector, "text/ecopcr2", ".ecopcr") + mimetype.Lookup("application/octet-stream").Extend(genbankDetector, "text/genbank", ".seq") + mimetype.Lookup("application/octet-stream").Extend(emblDetector, "text/embl", ".dat") + // Create a buffer to store the read data buf := make([]byte, 1024*128) - n, err := stream.Read(buf) + n, err := io.ReadFull(stream, buf) - if err != nil && err != io.EOF { + if err != nil && err != io.ErrUnexpectedEOF { return nil, nil, err } @@ -84,7 +90,11 @@ func OBIMimeTypeGuesser(stream io.Reader) (*mimetype.MIME, io.Reader, error) { } // Create a new reader based on the read data - newReader := io.MultiReader(bytes.NewReader(buf[:n]), stream) + newReader := io.Reader(bytes.NewReader(buf[:n])) + + if err == nil { + newReader = io.MultiReader(newReader, stream) + } return mimeType, newReader, nil } diff --git a/pkg/obitools/obipcr/options.go b/pkg/obitools/obipcr/options.go index a2f0b44..d4fc07c 100644 --- a/pkg/obitools/obipcr/options.go +++ b/pkg/obitools/obipcr/options.go @@ -18,6 +18,7 @@ var _MinimumLength = 0 var _MaximumLength = -1 var _Fragmented = false var _Delta = -1 +var _OnlyFull = false // PCROptionSet defines every options related to a simulated PCR. // @@ -58,6 +59,9 @@ func PCROptionSet(options *getoptions.GetOpt) { options.IntVar(&_Delta, "delta", -1, options.Alias("D"), options.Description("Lenght of the sequence fragment to be added to the barcode extremities.")) + options.BoolVar(&_OnlyFull, "only-complete-flanking", false, + options.Description("Only fragments with complete flanking sequences are printed.")) + } // OptionSet adds to the basic option set every options declared for @@ -131,3 +135,7 @@ func CLIWithExtension() bool { func CLIExtension() int { return _Delta } + +func CLIOnlyFull() bool { + return _OnlyFull +} diff --git a/pkg/obitools/obipcr/pcr.go b/pkg/obitools/obipcr/pcr.go index e290540..59cf21c 100644 --- a/pkg/obitools/obipcr/pcr.go +++ b/pkg/obitools/obipcr/pcr.go @@ -24,6 +24,7 @@ func CLIPCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) { CLIReversePrimer(), CLIAllowedMismatch(), ), + obiapat.OptionOnlyFullExtension(CLIOnlyFull()), ) if CLIMinLength() > 0 {