From 4551df08b127c643e76377fa666bf04c86391b8a Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 18 Jan 2022 13:09:32 +0100 Subject: [PATCH] Adds a reader for NGS filter files and change some API for the apat library --- cmd/obitools/obimultiplex/main.go | 34 +++++++ cmd/obitools/obipcr/main.go | 16 ++-- cmd/test/main.go | 29 ++---- pkg/obiapat/pattern.go | 2 - pkg/obiapat/pcr.go | 119 ++++++++++++++--------- pkg/obiformats/embl_read.go | 6 +- pkg/obiformats/fastseq_obi_header.go | 20 ++-- pkg/obiformats/fastseq_write_fasta.go | 2 +- pkg/obiformats/fastseq_write_fastq.go | 2 +- pkg/obiformats/ngsfilter_read.go | 133 ++++++++++++++++++++++++++ pkg/obiformats/universal_write.go | 17 +++- pkg/obitools/obimultiplex/options.go | 1 + pkg/obitools/obipcr/pcr.go | 16 +++- 13 files changed, 301 insertions(+), 96 deletions(-) create mode 100644 cmd/obitools/obimultiplex/main.go create mode 100644 pkg/obiformats/ngsfilter_read.go create mode 100644 pkg/obitools/obimultiplex/options.go diff --git a/cmd/obitools/obimultiplex/main.go b/cmd/obitools/obimultiplex/main.go new file mode 100644 index 0000000..0cb8432 --- /dev/null +++ b/cmd/obitools/obimultiplex/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "os" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obipcr" +) + +func main() { + + // f, err := os.Create("cpu.pprof") + // if err != nil { + // log.Fatal(err) + // } + // pprof.StartCPUProfile(f) + // defer pprof.StopCPUProfile() + + // ftrace, err := os.Create("cpu.trace") + // if err != nil { + // log.Fatal(err) + // } + // trace.Start(ftrace) + // defer trace.Stop() + + optionParser := obioptions.GenerateOptionParser(obipcr.OptionSet) + + _, args, _ := optionParser(os.Args) + + sequences, _ := obiconvert.ReadBioSequencesBatch(args...) + amplicons, _ := obipcr.PCR(sequences) + obiconvert.WriteBioSequencesBatch(amplicons, true) +} diff --git a/cmd/obitools/obipcr/main.go b/cmd/obitools/obipcr/main.go index 0cb8432..aec8621 100644 --- a/cmd/obitools/obipcr/main.go +++ b/cmd/obitools/obipcr/main.go @@ -1,7 +1,9 @@ package main import ( + "log" "os" + "runtime/trace" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" @@ -10,6 +12,7 @@ import ( func main() { + // go tool pprof -http=":8000" ./obipairing ./cpu.pprof // f, err := os.Create("cpu.pprof") // if err != nil { // log.Fatal(err) @@ -17,12 +20,13 @@ func main() { // pprof.StartCPUProfile(f) // defer pprof.StopCPUProfile() - // ftrace, err := os.Create("cpu.trace") - // if err != nil { - // log.Fatal(err) - // } - // trace.Start(ftrace) - // defer trace.Stop() + // go tool trace cpu.trace + ftrace, err := os.Create("cpu.trace") + if err != nil { + log.Fatal(err) + } + trace.Start(ftrace) + defer trace.Stop() optionParser := obioptions.GenerateOptionParser(obipcr.OptionSet) diff --git a/cmd/test/main.go b/cmd/test/main.go index 6d45244..393bcc6 100644 --- a/cmd/test/main.go +++ b/cmd/test/main.go @@ -6,7 +6,8 @@ import ( "os" "runtime/trace" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obialign" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiapat" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -41,27 +42,17 @@ func main() { // } A := []byte("ccgcctccttagaacaggctcctctagaaaaccatagtgggatatctaaagaaggcggagatagaaagagcggttcagcaggaatgccgagatggacggcgtgtgacg") - B := []byte("cgccaccaccgagatctacactctttccctacacgacgctcttccgatctccgcctccttagaacaggctcctctagaaaagcatagtggggtatctaaaggaggcgg") + // B := []byte("cgccaccaccgagatctacactctttccctacacgacgctcttccgatctccgcctccttagaacaggctcctctagaaaagcatagtggggtatctaaaggaggcgg") sA := obiseq.MakeBioSequence("A", A, "") - sB := obiseq.MakeBioSequence("B", B, "") + // sB := obiseq.MakeBioSequence("B", B, "") - fmt.Println(string(sA.Sequence())) - fmt.Println(sA.Qualities()) - fmt.Println(string(sB.Sequence())) - fmt.Println(sB.Qualities()) + pat, _ := obiapat.MakeApatPattern("TCCTTCCAACAGGCTCCTC", 3) + as, _ := obiapat.MakeApatSequence(sA, false) + fmt.Println(pat.FindAllIndex(as)) - score, path := obialign.PELeftAlign(sA, sB, 2, obialign.NilPEAlignArena) - fmt.Printf("Score : %d Path : %v\n", score, path) - score, path = obialign.PERightAlign(sA, sB, 2, obialign.NilPEAlignArena) - fmt.Printf("Score : %d Path : %v\n", score, path) + file, _ := os.Open("sample/wolf_diet_ngsfilter.txt") + xxx, _ := obiformats.ReadNGSFilter(file) - fmt.Println(string(sA.Sequence())) - sA.ReverseComplement(true) - fmt.Println(string(sA.Sequence())) - fmt.Println(string(sA.Id())) - - sA.Reset() - fmt.Println(sA.Length()) - fmt.Println(sA.String()) + fmt.Println(xxx) } diff --git a/pkg/obiapat/pattern.go b/pkg/obiapat/pattern.go index 0d02b7f..ee3ff33 100644 --- a/pkg/obiapat/pattern.go +++ b/pkg/obiapat/pattern.go @@ -202,8 +202,6 @@ func (pattern ApatPattern) FindAllIndex(sequence ApatSequence, limits ...int) (l C.int32_t(begin), C.int32_t(length+C.MAX_PAT_LEN))) - //log.Printf("match count : %d\n", nhits) - if nhits == 0 { return nil } diff --git a/pkg/obiapat/pcr.go b/pkg/obiapat/pcr.go index d780eb4..32a9e7a 100644 --- a/pkg/obiapat/pcr.go +++ b/pkg/obiapat/pcr.go @@ -1,6 +1,8 @@ package obiapat import ( + "log" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -14,6 +16,10 @@ type _Options struct { bufferSize int batchSize int parallelWorkers int + forward ApatPattern + cfwd ApatPattern + reverse ApatPattern + crev ApatPattern } // Options stores a set of option usable by the @@ -90,6 +96,10 @@ func MakeOptions(setters []WithOption) Options { parallelWorkers: 4, batchSize: 100, bufferSize: 100, + forward: NilApatPattern, + cfwd: NilApatPattern, + reverse: NilApatPattern, + crev: NilApatPattern, } opt := Options{&o} @@ -126,19 +136,41 @@ func OptionMaxLength(maxLength int) WithOption { // OptionForwardError sets the number of // error allowed when matching the forward // primer. -func OptionForwardError(max int) WithOption { +func OptionForwardPrimer(primer string, max int) WithOption { f := WithOption(func(opt Options) { + var err error + + opt.pointer.forward, err = MakeApatPattern(primer, max) + if err != nil { + log.Fatalf("error : %v\n", err) + } + + opt.pointer.cfwd, err = opt.pointer.forward.ReverseComplement() + if err != nil { + log.Fatalf("error : %v\n", err) + } opt.pointer.forwardError = max }) return f } -// OptionReverseError sets the number of -// error allowed when matching the reverse +// OptionForwardError sets the number of +// error allowed when matching the forward // primer. -func OptionReverseError(max int) WithOption { +func OptionReversePrimer(primer string, max int) WithOption { f := WithOption(func(opt Options) { + var err error + + opt.pointer.reverse, err = MakeApatPattern(primer, max) + if err != nil { + log.Fatalf("error : %v\n", err) + } + + opt.pointer.crev, err = opt.pointer.reverse.ReverseComplement() + if err != nil { + log.Fatalf("error : %v\n", err) + } opt.pointer.reverseError = max }) @@ -185,14 +217,37 @@ func OptionBatchSize(size int) WithOption { return f } -func _Pcr(seq ApatSequence, sequence obiseq.BioSequence, - forward, cfwd, reverse, crev ApatPattern, +func (options Options) Free() { + if options.pointer.forward.pointer != nil { + options.pointer.forward.Free() + } + + if options.pointer.cfwd.pointer != nil { + options.pointer.cfwd.Free() + } + + if options.pointer.reverse.pointer != nil { + options.pointer.reverse.Free() + } + + if options.pointer.crev.pointer != nil { + options.pointer.crev.Free() + } +} + +func _Pcr(seq ApatSequence, + sequence obiseq.BioSequence, opt Options) obiseq.BioSequenceSlice { results := make(obiseq.BioSequenceSlice, 0, 10) + forward := opt.pointer.forward + cfwd := opt.pointer.cfwd + reverse := opt.pointer.reverse + crev := opt.pointer.crev + forwardMatches := forward.FindAllIndex(seq) - if forwardMatches != nil { + if len(forwardMatches) > 0 { begin := forwardMatches[0][0] length := seq.Length() - begin @@ -262,7 +317,6 @@ func _Pcr(seq ApatSequence, sequence obiseq.BioSequence, } forwardMatches = reverse.FindAllIndex(seq) - if forwardMatches != nil { begin := forwardMatches[0][0] @@ -340,28 +394,15 @@ func _Pcr(seq ApatSequence, sequence obiseq.BioSequence, // obiseq.BioSequence instance. PCR parameters are // specified using the corresponding Option functions // defined for the PCR algorithm. -func PCR(sequence obiseq.BioSequence, - forward, reverse string, options ...WithOption) obiseq.BioSequenceSlice { +func PCR(sequence obiseq.BioSequence, options ...WithOption) obiseq.BioSequenceSlice { opt := MakeOptions(options) + defer opt.Free() seq, _ := MakeApatSequence(sequence, opt.Circular()) + defer seq.Free() - fwd, _ := MakeApatPattern(forward, opt.ForwardError()) - rev, _ := MakeApatPattern(reverse, opt.ReverseError()) - cfwd, _ := fwd.ReverseComplement() - crev, _ := rev.ReverseComplement() - - results := _Pcr(seq, sequence, - fwd, cfwd, rev, crev, - opt) - - seq.Free() - - fwd.Free() - rev.Free() - cfwd.Free() - crev.Free() + results := _Pcr(seq, sequence, opt) return results } @@ -372,32 +413,24 @@ func PCR(sequence obiseq.BioSequence, // specified using the corresponding Option functions // defined for the PCR algorithm. func PCRSlice(sequences obiseq.BioSequenceSlice, - forward, reverse string, options ...WithOption) obiseq.BioSequenceSlice { + options ...WithOption) obiseq.BioSequenceSlice { results := make(obiseq.BioSequenceSlice, 0, len(sequences)) opt := MakeOptions(options) - - fwd, _ := MakeApatPattern(forward, opt.ForwardError()) - rev, _ := MakeApatPattern(reverse, opt.ReverseError()) - cfwd, _ := fwd.ReverseComplement() - crev, _ := rev.ReverseComplement() + defer opt.Free() if len(sequences) > 0 { seq, _ := MakeApatSequence(sequences[0], opt.Circular()) - amplicons := _Pcr(seq, sequences[0], - fwd, cfwd, rev, crev, - opt) + amplicons := _Pcr(seq, sequences[0], opt) if len(amplicons) > 0 { results = append(results, amplicons...) } for _, sequence := range sequences[1:] { - seq, _ := MakeApatSequence(sequence, opt.Circular(), seq) - amplicons = _Pcr(seq, sequence, - fwd, cfwd, rev, crev, - opt) + seq, _ = MakeApatSequence(sequence, opt.Circular(), seq) + amplicons = _Pcr(seq, sequence, opt) if len(amplicons) > 0 { results = append(results, amplicons...) } @@ -406,21 +439,15 @@ func PCRSlice(sequences obiseq.BioSequenceSlice, seq.Free() } - fwd.Free() - rev.Free() - cfwd.Free() - crev.Free() - return results } // PCRSliceWorker is a worker function builder which produce // job function usable by the obiseq.MakeISliceWorker function. -func PCRSliceWorker(forward, reverse string, - options ...WithOption) obiseq.SeqSliceWorker { +func PCRSliceWorker(options ...WithOption) obiseq.SeqSliceWorker { worker := func(sequences obiseq.BioSequenceSlice) obiseq.BioSequenceSlice { - return PCRSlice(sequences, forward, reverse, options...) + return PCRSlice(sequences, options...) } return worker diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index 16efa66..a97897b 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -184,8 +184,8 @@ func ReadEMBLBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenceB newIter := obiseq.MakeIBioSequenceBatch(opt.BufferSize()) - // newIter.Add(opt.ParallelWorkers()) - newIter.Add(2) + nworkers := opt.ParallelWorkers() + newIter.Add(nworkers) go func() { newIter.Wait() @@ -196,7 +196,7 @@ func ReadEMBLBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenceB }() // for j := 0; j < opt.ParallelWorkers(); j++ { - for j := 0; j < 2; j++ { + for j := 0; j < nworkers; j++ { go _ParseEmblFile(entry_channel, newIter) } diff --git a/pkg/obiformats/fastseq_obi_header.go b/pkg/obiformats/fastseq_obi_header.go index 950c896..5faf6cf 100644 --- a/pkg/obiformats/fastseq_obi_header.go +++ b/pkg/obiformats/fastseq_obi_header.go @@ -173,16 +173,11 @@ func __is_false__(text []byte) bool { bytes.Equal(text, __FALSE__) } -func ParseFastSeqOBIHeader(sequence obiseq.BioSequence) { - definition := []byte(sequence.Definition()) - annotations := sequence.Annotations() - - // all_matches := __obi_header_pattern__.FindAllSubmatchIndex(definition, -1) +func ParseOBIFeatures(text string, annotations obiseq.Annotation) string { + definition := []byte(text) d := definition - //for m := __obi_header_key_pattern__.FindIndex(definition); len(m) > 0; { - //fmt.Println(string(definition[0:20]), __match__key__(definition)) for m := __match__key__(definition); len(m) > 0; { var bvalue []byte var value interface{} @@ -263,7 +258,16 @@ func ParseFastSeqOBIHeader(sequence obiseq.BioSequence) { m = __match__key__(d) } - sequence.SetDefinition(string(bytes.TrimSpace(d))) + return string(bytes.TrimSpace(d)) +} + +func ParseFastSeqOBIHeader(sequence obiseq.BioSequence) { + annotations := sequence.Annotations() + + definition := ParseOBIFeatures(sequence.Definition(), + annotations) + + sequence.SetDefinition(definition) } func FormatFastSeqOBIHeader(sequence obiseq.BioSequence) string { diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index 7fa0de9..9171534 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -120,10 +120,10 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options } log.Println("Start of the fasta file writing") + go ff(iterator) for i := 0; i < nwriters-1; i++ { go ff(iterator.Split()) } - go ff(iterator) next_to_send := 0 received := make(map[int]FileChunck, 100) diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 620bc53..a28575a 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -122,10 +122,10 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options } log.Println("Start of the fastq file writing") + go ff(iterator) for i := 0; i < nwriters-1; i++ { go ff(iterator.Split()) } - go ff(iterator) next_to_send := 0 received := make(map[int]FileChunck, 100) diff --git a/pkg/obiformats/ngsfilter_read.go b/pkg/obiformats/ngsfilter_read.go new file mode 100644 index 0000000..c7053f3 --- /dev/null +++ b/pkg/obiformats/ngsfilter_read.go @@ -0,0 +1,133 @@ +package obiformats + +import ( + "bufio" + "fmt" + "io" + "strings" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + +type PrimerPair struct { + Forward string + Reverse string +} + +type TagPair struct { + Forward string + Reverse string +} + +type PCR struct { + Experiment string + Sample string + Partial bool + Annotations obiseq.Annotation +} + +type PCRs map[TagPair]PCR +type NGSFilter map[PrimerPair]PCRs + +func _readLines(reader io.Reader) []string { + r := bufio.NewReader(reader) + bytes := []byte{} + lines := []string{} + for { + line, isPrefix, err := r.ReadLine() + if err != nil { + break + } + bytes = append(bytes, line...) + if !isPrefix { + str := strings.TrimSpace(string(bytes)) + if len(str) > 0 { + lines = append(lines, str) + bytes = []byte{} + } + } + } + if len(bytes) > 0 { + lines = append(lines, string(bytes)) + } + return lines +} + +func _parseMainNGSFilterTags(text string) TagPair { + + tags := strings.Split(text, ":") + + if len(tags) == 1 { + return TagPair{tags[0], tags[0]} + } + + if tags[0] == "-" { + tags[0] = "" + } + + if tags[1] == "-" { + tags[1] = "" + } + + return TagPair{tags[0], tags[1]} +} + +func _parseMainNGSFilter(text string) (PrimerPair, TagPair, string, string, bool) { + fields := strings.Fields(text) + + tags := _parseMainNGSFilterTags(fields[2]) + partial := fields[5] == "T" || fields[5] == "t" + + return PrimerPair{fields[3], fields[4]}, + tags, + fields[0], + fields[1], + partial +} + +func ReadNGSFilter(reader io.Reader) (NGSFilter, error) { + ngsfilter := make(NGSFilter, 10) + + lines := _readLines(reader) + + for _, line := range lines { + line = strings.TrimSpace(line) + + if strings.HasPrefix(line, "#") || len(line) == 0 { + continue + } + + split := strings.SplitN(line, "@", 2) + + primers, tags, experiment, sample, partial := _parseMainNGSFilter(split[0]) + newPCR := PCR{ + Experiment: experiment, + Sample: sample, + Partial: partial, + Annotations: nil, + } + + if len(split) > 1 && len(split[1]) > 0 { + newPCR.Annotations = obiseq.GetAnnotation() + ParseOBIFeatures(split[1], newPCR.Annotations) + } + + samples, ok := ngsfilter[primers] + + if ok { + pcr, ok := samples[tags] + + if ok { + return nil, fmt.Errorf("pair of tags %v used for samples %s in %s and %s in %s", + tags, sample, experiment, pcr.Sample, pcr.Experiment) + } + + samples[tags] = newPCR + } else { + ngsfilter[primers] = make(PCRs, 1000) + ngsfilter[primers][tags] = newPCR + } + } + + return ngsfilter, nil +} diff --git a/pkg/obiformats/universal_write.go b/pkg/obiformats/universal_write.go index b080909..68d97da 100644 --- a/pkg/obiformats/universal_write.go +++ b/pkg/obiformats/universal_write.go @@ -56,16 +56,23 @@ func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) (obiseq.IBioSequenceBatch, error) { - var newIter obiseq.IBioSequenceBatch - var err error + iterator = iterator.Rebatch(1000) ok := iterator.Next() if ok { - iterator.PushBack() batch := iterator.Get() - if batch.Slice()[0].HasQualities() { - newIter, err = WriteFastqBatch(iterator, file, options...) + iterator.PushBack() + + var newIter obiseq.IBioSequenceBatch + var err error + + if len(batch.Slice()) > 0 { + if batch.Slice()[0].HasQualities() { + newIter, err = WriteFastqBatch(iterator, file, options...) + } else { + newIter, err = WriteFastaBatch(iterator, file, options...) + } } else { newIter, err = WriteFastaBatch(iterator, file, options...) } diff --git a/pkg/obitools/obimultiplex/options.go b/pkg/obitools/obimultiplex/options.go new file mode 100644 index 0000000..19db7ca --- /dev/null +++ b/pkg/obitools/obimultiplex/options.go @@ -0,0 +1 @@ +package obimultiplex diff --git a/pkg/obitools/obipcr/pcr.go b/pkg/obitools/obipcr/pcr.go index 02b3285..575973b 100644 --- a/pkg/obitools/obipcr/pcr.go +++ b/pkg/obitools/obipcr/pcr.go @@ -10,12 +10,18 @@ import ( // obiseq.BioSequenceBatch containing the selected amplicon sequences. func PCR(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSequenceBatch, error) { - forward := ForwardPrimer() - reverse := ReversePrimer() opts := make([]obiapat.WithOption, 0, 10) - opts = append(opts, obiapat.OptionForwardError(AllowedMismatch()), - obiapat.OptionReverseError(AllowedMismatch())) + opts = append(opts, + obiapat.OptionForwardPrimer( + ForwardPrimer(), + AllowedMismatch(), + ), + obiapat.OptionReversePrimer( + ReversePrimer(), + AllowedMismatch(), + ), + ) if MinLength() > 0 { opts = append(opts, obiapat.OptionMinLength(MinLength())) @@ -29,7 +35,7 @@ func PCR(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSequenceBatch, error) { opts = append(opts, obiapat.OptionCircular(Circular())) } - worker := obiapat.PCRSliceWorker(forward, reverse, opts...) + worker := obiapat.PCRSliceWorker(opts...) return iterator.MakeISliceWorker(worker), nil }