From eaf65fbcce8ed3f1380c9af2b822f75ffa307b2b Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Thu, 24 Feb 2022 07:08:40 +0100 Subject: [PATCH] Some code refactoring, a new version of obiuniq more efficient in memory and a first make file allowing to build obitools --- cmd/obitools/obicount/main.go | 17 +- pkg/obiapat/pcr.go | 3 +- pkg/obichunk/chunk_on_disk.go | 11 +- pkg/obichunk/chunks.go | 9 +- pkg/obichunk/subchunks.go | 13 +- pkg/obichunk/unique.go | 21 +- pkg/obiformats/dispatcher.go | 8 +- pkg/obiformats/ecopcr_read.go | 17 +- pkg/obiformats/embl_read.go | 17 +- pkg/obiformats/fastseq_header.go | 9 +- pkg/obiformats/fastseq_read.go | 24 +- pkg/obiformats/fastseq_write_fasta.go | 28 +- pkg/obiformats/fastseq_write_fastq.go | 26 +- pkg/obiformats/options.go | 7 +- pkg/obiformats/universal_read.go | 14 +- pkg/obiformats/universal_write.go | 26 +- pkg/obiiter/batch.go | 53 ++ pkg/obiiter/batchiterator.go | 560 ++++++++++++++++++ pkg/{obiseq => obiiter}/distribute.go | 16 +- pkg/{obiseq => obiiter}/iterator.go | 20 +- pkg/obiiter/merge.go | 50 ++ pkg/obiiter/pairedbatchiterator.go | 221 +++++++ pkg/obiiter/pipe.go | 46 ++ pkg/{obiseq => obiiter}/speed.go | 11 +- pkg/{obiseq => obiiter}/workers.go | 83 ++- pkg/obingslibrary/worker.go | 9 +- ...{batchiterator.go => batchiterator.go.old} | 0 pkg/obiseq/merge.go | 38 -- ...iterator.go => pairedbatchiterator.go.old} | 0 pkg/obiseq/predicate.go | 37 ++ pkg/obiseq/worker.go | 2 + pkg/obitools/obiconvert/sequence_reader.go | 18 +- pkg/obitools/obiconvert/sequence_writer.go | 14 +- pkg/obitools/obidistribute/distribute.go | 4 +- pkg/obitools/obimultiplex/demultiplex.go | 5 +- pkg/obitools/obipairing/options.go | 8 +- pkg/obitools/obipairing/pairing.go | 13 +- pkg/obitools/obipcr/pcr.go | 4 +- pkg/obitools/obiuniq/unique.go | 4 +- 39 files changed, 1225 insertions(+), 241 deletions(-) create mode 100644 pkg/obiiter/batch.go create mode 100644 pkg/obiiter/batchiterator.go rename pkg/{obiseq => obiiter}/distribute.go (79%) rename pkg/{obiseq => obiiter}/iterator.go (92%) create mode 100644 pkg/obiiter/merge.go create mode 100644 pkg/obiiter/pairedbatchiterator.go create mode 100644 pkg/obiiter/pipe.go rename pkg/{obiseq => obiiter}/speed.go (81%) rename pkg/{obiseq => obiiter}/workers.go (75%) rename pkg/obiseq/{batchiterator.go => batchiterator.go.old} (100%) rename pkg/obiseq/{pairedbatchiterator.go => pairedbatchiterator.go.old} (100%) create mode 100644 pkg/obiseq/worker.go diff --git a/cmd/obitools/obicount/main.go b/cmd/obitools/obicount/main.go index 90893e4..90aaa14 100644 --- a/cmd/obitools/obicount/main.go +++ b/cmd/obitools/obicount/main.go @@ -2,7 +2,6 @@ package main import ( "fmt" - "log" "os" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" @@ -34,20 +33,8 @@ func main() { _, args, _ := optionParser(os.Args) - fs, _ := obiconvert.ReadBioSequences(args...) - nread := 0 - nvariant := 0 - nsymbol := 0 - for fs.Next() { - s := fs.Get() - if s==nil { - log.Panicln("Read sequence is nil") - } - nread += s.Count() - nvariant++ - nsymbol += s.Length() - s.Recycle() - } + fs, _ := obiconvert.ReadBioSequencesBatch(args...) + nread, nvariant, nsymbol := fs.Count(true) if obicount.CLIIsPrintingVariantCount() { fmt.Printf(" %d", nvariant) diff --git a/pkg/obiapat/pcr.go b/pkg/obiapat/pcr.go index 441412b..d2e2cd2 100644 --- a/pkg/obiapat/pcr.go +++ b/pkg/obiapat/pcr.go @@ -4,6 +4,7 @@ import ( "log" "git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -429,7 +430,7 @@ func PCRSlice(sequences obiseq.BioSequenceSlice, // PCRSliceWorker is a worker function builder which produce // job function usable by the obiseq.MakeISliceWorker function. -func PCRSliceWorker(options ...WithOption) obiseq.SeqSliceWorker { +func PCRSliceWorker(options ...WithOption) obiiter.SeqSliceWorker { opt := MakeOptions(options) worker := func(sequences obiseq.BioSequenceSlice) obiseq.BioSequenceSlice { diff --git a/pkg/obichunk/chunk_on_disk.go b/pkg/obichunk/chunk_on_disk.go index 5635e44..6529f0c 100644 --- a/pkg/obichunk/chunk_on_disk.go +++ b/pkg/obichunk/chunk_on_disk.go @@ -8,6 +8,7 @@ import ( "path/filepath" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -33,12 +34,12 @@ func find(root, ext string) []string { return a } -func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch, +func ISequenceChunkOnDisk(iterator obiiter.IBioSequenceBatch, classifier *obiseq.BioSequenceClassifier, - sizes ...int) (obiseq.IBioSequenceBatch, error) { + sizes ...int) (obiiter.IBioSequenceBatch, error) { dir, err := tempDir() if err != nil { - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } bufferSize := iterator.BufferSize() @@ -47,7 +48,7 @@ func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch, bufferSize = sizes[0] } - newIter := obiseq.MakeIBioSequenceBatch(bufferSize) + newIter := obiiter.MakeIBioSequenceBatch(bufferSize) newIter.Add(1) @@ -86,7 +87,7 @@ func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch, b.Recycle() } - newIter.Push(obiseq.MakeBioSequenceBatch(order, chunck)) + newIter.Push(obiiter.MakeBioSequenceBatch(order, chunck)) } diff --git a/pkg/obichunk/chunks.go b/pkg/obichunk/chunks.go index 54c7bc6..8d04597 100644 --- a/pkg/obichunk/chunks.go +++ b/pkg/obichunk/chunks.go @@ -4,12 +4,13 @@ import ( "log" "sync" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) -func ISequenceChunk(iterator obiseq.IBioSequenceBatch, +func ISequenceChunk(iterator obiiter.IBioSequenceBatch, classifier *obiseq.BioSequenceClassifier, - sizes ...int) (obiseq.IBioSequenceBatch, error) { + sizes ...int) (obiiter.IBioSequenceBatch, error) { bufferSize := iterator.BufferSize() @@ -17,7 +18,7 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, bufferSize = sizes[0] } - newIter := obiseq.MakeIBioSequenceBatch(bufferSize) + newIter := obiiter.MakeIBioSequenceBatch(bufferSize) newIter.Add(1) @@ -64,7 +65,7 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, for _, chunck := range chunks { if len(*chunck) > 0 { - newIter.Push(obiseq.MakeBioSequenceBatch(order, *chunck)) + newIter.Push(obiiter.MakeBioSequenceBatch(order, *chunck)) order++ } diff --git a/pkg/obichunk/subchunks.go b/pkg/obichunk/subchunks.go index 281d588..bd5253a 100644 --- a/pkg/obichunk/subchunks.go +++ b/pkg/obichunk/subchunks.go @@ -5,6 +5,7 @@ import ( "sort" "sync/atomic" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -54,9 +55,9 @@ func (by _By) Sort(seqs []sSS) { // End of the sort interface // -func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch, +func ISequenceSubChunk(iterator obiiter.IBioSequenceBatch, classifier *obiseq.BioSequenceClassifier, - sizes ...int) (obiseq.IBioSequenceBatch, error) { + sizes ...int) (obiiter.IBioSequenceBatch, error) { bufferSize := iterator.BufferSize() nworkers := 4 @@ -69,7 +70,7 @@ func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch, bufferSize = sizes[1] } - newIter := obiseq.MakeIBioSequenceBatch(bufferSize) + newIter := obiiter.MakeIBioSequenceBatch(bufferSize) newIter.Add(nworkers) @@ -86,7 +87,7 @@ func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch, return neworder } - ff := func(iterator obiseq.IBioSequenceBatch, + ff := func(iterator obiiter.IBioSequenceBatch, classifier *obiseq.BioSequenceClassifier) { ordered := make([]sSS, 100) @@ -121,7 +122,7 @@ func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch, ss := obiseq.MakeBioSequenceSlice() for i, v := range ordered { if v.code != last { - newIter.Push(obiseq.MakeBioSequenceBatch(nextOrder(), ss)) + newIter.Push(obiiter.MakeBioSequenceBatch(nextOrder(), ss)) ss = obiseq.MakeBioSequenceSlice() last = v.code } @@ -131,7 +132,7 @@ func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch, } if len(ss) > 0 { - newIter.Push(obiseq.MakeBioSequenceBatch(nextOrder(), ss)) + newIter.Push(obiiter.MakeBioSequenceBatch(nextOrder(), ss)) } } else { newIter.Push(batch.Reorder(nextOrder())) diff --git a/pkg/obichunk/unique.go b/pkg/obichunk/unique.go index 4d91627..68eabfe 100644 --- a/pkg/obichunk/unique.go +++ b/pkg/obichunk/unique.go @@ -3,26 +3,27 @@ package obichunk import ( "sync" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) -func IUniqueSequence(iterator obiseq.IBioSequenceBatch, - options ...WithOption) (obiseq.IBioSequenceBatch, error) { +func IUniqueSequence(iterator obiiter.IBioSequenceBatch, + options ...WithOption) (obiiter.IBioSequenceBatch, error) { var err error opts := MakeOptions(options) nworkers := opts.ParallelWorkers() - iUnique := obiseq.MakeIBioSequenceBatch(opts.BufferSize()) + iUnique := obiiter.MakeIBioSequenceBatch(opts.BufferSize()) if opts.SortOnDisk() { nworkers = 1 iterator, err = ISequenceChunkOnDisk(iterator, obiseq.HashClassifier(opts.BatchCount()), - opts.BufferSize()) + 0) if err != nil { - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } } else { @@ -31,7 +32,7 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch, opts.BufferSize()) if err != nil { - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } } @@ -53,12 +54,12 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch, return neworder } - var ff func(obiseq.IBioSequenceBatch, *obiseq.BioSequenceClassifier, int) + var ff func(obiiter.IBioSequenceBatch, *obiseq.BioSequenceClassifier, int) cat := opts.Categories() na := opts.NAValue() - ff = func(input obiseq.IBioSequenceBatch, + ff = func(input obiiter.IBioSequenceBatch, classifier *obiseq.BioSequenceClassifier, icat int) { icat-- @@ -67,9 +68,9 @@ func IUniqueSequence(iterator obiseq.IBioSequenceBatch, 1, opts.BufferSize()) - var next obiseq.IBioSequenceBatch + var next obiiter.IBioSequenceBatch if icat >= 0 { - next = obiseq.MakeIBioSequenceBatch(opts.BufferSize()) + next = obiiter.MakeIBioSequenceBatch(opts.BufferSize()) iUnique.Add(1) go ff(next, diff --git a/pkg/obiformats/dispatcher.go b/pkg/obiformats/dispatcher.go index cd44a3f..4428f6a 100644 --- a/pkg/obiformats/dispatcher.go +++ b/pkg/obiformats/dispatcher.go @@ -5,15 +5,15 @@ import ( "log" "sync" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" ) -type SequenceBatchWriterToFile func(iterator obiseq.IBioSequenceBatch, +type SequenceBatchWriterToFile func(iterator obiiter.IBioSequenceBatch, filename string, - options ...WithOption) (obiseq.IBioSequenceBatch, error) + options ...WithOption) (obiiter.IBioSequenceBatch, error) func WriterDispatcher(prototypename string, - dispatcher obiseq.IDistribute, + dispatcher obiiter.IDistribute, formater SequenceBatchWriterToFile, options ...WithOption) { diff --git a/pkg/obiformats/ecopcr_read.go b/pkg/obiformats/ecopcr_read.go index 10deb52..74192f0 100644 --- a/pkg/obiformats/ecopcr_read.go +++ b/pkg/obiformats/ecopcr_read.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -117,7 +118,7 @@ func __read_ecopcr_bioseq__(file *__ecopcr_file__) (*obiseq.BioSequence, error) return bseq, nil } -func ReadEcoPCRBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenceBatch { +func ReadEcoPCRBatch(reader io.Reader, options ...WithOption) obiiter.IBioSequenceBatch { tag := make([]byte, 11) n, _ := reader.Read(tag) @@ -163,7 +164,7 @@ func ReadEcoPCRBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenc opt := MakeOptions(options) - newIter := obiseq.MakeIBioSequenceBatch(opt.BufferSize()) + newIter := obiiter.MakeIBioSequenceBatch(opt.BufferSize()) newIter.Add(1) go func() { @@ -181,7 +182,7 @@ func ReadEcoPCRBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenc slice = append(slice, seq) ii++ if ii >= opt.BatchSize() { - newIter.Push(obiseq.MakeBioSequenceBatch(i, slice)) + newIter.Push(obiiter.MakeBioSequenceBatch(i, slice)) slice = obiseq.MakeBioSequenceSlice() i++ ii = 0 @@ -191,7 +192,7 @@ func ReadEcoPCRBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenc } if len(slice) > 0 { - newIter.Push(obiseq.MakeBioSequenceBatch(i, slice)) + newIter.Push(obiiter.MakeBioSequenceBatch(i, slice)) } newIter.Done() @@ -205,12 +206,12 @@ func ReadEcoPCRBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenc return newIter } -func ReadEcoPCR(reader io.Reader, options ...WithOption) obiseq.IBioSequence { +func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { ib := ReadEcoPCRBatch(reader, options...) return ib.SortBatches().IBioSequence() } -func ReadEcoPCRBatchFromFile(filename string, options ...WithOption) (obiseq.IBioSequenceBatch, error) { +func ReadEcoPCRBatchFromFile(filename string, options ...WithOption) (obiiter.IBioSequenceBatch, error) { var reader io.Reader var greader io.Reader var err error @@ -218,7 +219,7 @@ func ReadEcoPCRBatchFromFile(filename string, options ...WithOption) (obiseq.IBi reader, err = os.Open(filename) if err != nil { log.Printf("open file error: %+v", err) - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } // Test if the flux is compressed by gzip @@ -230,7 +231,7 @@ func ReadEcoPCRBatchFromFile(filename string, options ...WithOption) (obiseq.IBi return ReadEcoPCRBatch(reader, options...), nil } -func ReadEcoPCRFromFile(filename string, options ...WithOption) (obiseq.IBioSequence, error) { +func ReadEcoPCRFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { ib, err := ReadEcoPCRBatchFromFile(filename, options...) return ib.SortBatches().IBioSequence(), err diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index 6d2aa96..2cb966f 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -80,7 +81,7 @@ func _EndOfLastEntry(buff []byte) int { return -1 } -func _ParseEmblFile(input <-chan _FileChunk, out obiseq.IBioSequenceBatch) { +func _ParseEmblFile(input <-chan _FileChunk, out obiiter.IBioSequenceBatch) { for chunks := range input { scanner := bufio.NewScanner(chunks.raw) @@ -139,7 +140,7 @@ func _ParseEmblFile(input <-chan _FileChunk, out obiseq.IBioSequenceBatch) { seqBytes = new(bytes.Buffer) } } - out.Push(obiseq.MakeBioSequenceBatch(order, sequences)) + out.Push(obiiter.MakeBioSequenceBatch(order, sequences)) } out.Done() @@ -176,11 +177,11 @@ func _ReadFlatFileChunk(reader io.Reader, readers chan _FileChunk) { // 6 5 43 2 1 // ?//? -func ReadEMBLBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenceBatch { +func ReadEMBLBatch(reader io.Reader, options ...WithOption) obiiter.IBioSequenceBatch { opt := MakeOptions(options) entry_channel := make(chan _FileChunk, opt.BufferSize()) - newIter := obiseq.MakeIBioSequenceBatch(opt.BufferSize()) + newIter := obiiter.MakeIBioSequenceBatch(opt.BufferSize()) nworkers := opt.ParallelWorkers() newIter.Add(nworkers) @@ -199,12 +200,12 @@ func ReadEMBLBatch(reader io.Reader, options ...WithOption) obiseq.IBioSequenceB return newIter } -func ReadEMBL(reader io.Reader, options ...WithOption) obiseq.IBioSequence { +func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence { ib := ReadEMBLBatch(reader, options...) return ib.SortBatches().IBioSequence() } -func ReadEMBLBatchFromFile(filename string, options ...WithOption) (obiseq.IBioSequenceBatch, error) { +func ReadEMBLBatchFromFile(filename string, options ...WithOption) (obiiter.IBioSequenceBatch, error) { var reader io.Reader var greader io.Reader var err error @@ -212,7 +213,7 @@ func ReadEMBLBatchFromFile(filename string, options ...WithOption) (obiseq.IBioS reader, err = os.Open(filename) if err != nil { log.Printf("open file error: %+v", err) - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } // Test if the flux is compressed by gzip @@ -224,7 +225,7 @@ func ReadEMBLBatchFromFile(filename string, options ...WithOption) (obiseq.IBioS return ReadEMBLBatch(reader, options...), nil } -func ReadEMBLFromFile(filename string, options ...WithOption) (obiseq.IBioSequence, error) { +func ReadEMBLFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { ib, err := ReadEMBLBatchFromFile(filename, options...) return ib.SortBatches().IBioSequence(), err diff --git a/pkg/obiformats/fastseq_header.go b/pkg/obiformats/fastseq_header.go index e7fead7..f32fd87 100644 --- a/pkg/obiformats/fastseq_header.go +++ b/pkg/obiformats/fastseq_header.go @@ -3,6 +3,7 @@ package obiformats import ( "strings" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -14,14 +15,16 @@ func ParseGuessedFastSeqHeader(sequence *obiseq.BioSequence) { } } -func IParseFastSeqHeaderBatch(iterator obiseq.IBioSequenceBatch, options ...WithOption) obiseq.IBioSequenceBatch { +func IParseFastSeqHeaderBatch(iterator obiiter.IBioSequenceBatch, + options ...WithOption) obiiter.IBioSequenceBatch { opt := MakeOptions(options) - return iterator.MakeIWorker(obiseq.AnnotatorToSeqWorker(opt.ParseFastSeqHeader()), + return iterator.MakeIWorker(obiiter.AnnotatorToSeqWorker(opt.ParseFastSeqHeader()), opt.ParallelWorkers(), opt.BufferSize()) } -func IParseFastSeqHeader(iterator obiseq.IBioSequence, options ...WithOption) obiseq.IBioSequence { +func IParseFastSeqHeader(iterator obiiter.IBioSequence, + options ...WithOption) obiiter.IBioSequence { opt := MakeOptions(options) return IParseFastSeqHeaderBatch(iterator.IBioSequenceBatch(opt.BatchSize(), diff --git a/pkg/obiformats/fastseq_read.go b/pkg/obiformats/fastseq_read.go index 3561f19..459bc4d 100644 --- a/pkg/obiformats/fastseq_read.go +++ b/pkg/obiformats/fastseq_read.go @@ -13,11 +13,12 @@ import ( "unsafe" "git.metabarcoding.org/lecasofts/go/obitools/pkg/cutils" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) func _FastseqReader(seqfile C.fast_kseq_p, - iterator obiseq.IBioSequenceBatch, + iterator obiiter.IBioSequenceBatch, batch_size int) { var comment string i := 0 @@ -63,7 +64,7 @@ func _FastseqReader(seqfile C.fast_kseq_p, // log.Printf("\n==> Pushing sequence batch\n") // start := time.Now() - iterator.Push(obiseq.MakeBioSequenceBatch(i, slice)) + iterator.Push(obiiter.MakeBioSequenceBatch(i, slice)) // elapsed := time.Since(start) // log.Printf("\n==>sequences pushed after %s\n", elapsed) @@ -73,13 +74,13 @@ func _FastseqReader(seqfile C.fast_kseq_p, } } if len(slice) > 0 { - iterator.Push(obiseq.MakeBioSequenceBatch(i, slice)) + iterator.Push(obiiter.MakeBioSequenceBatch(i, slice)) } iterator.Done() } -func ReadFastSeqBatchFromFile(filename string, options ...WithOption) (obiseq.IBioSequenceBatch, error) { +func ReadFastSeqBatchFromFile(filename string, options ...WithOption) (obiiter.IBioSequenceBatch, error) { opt := MakeOptions(options) name := C.CString(filename) @@ -92,7 +93,7 @@ func ReadFastSeqBatchFromFile(filename string, options ...WithOption) (obiseq.IB if pointer == nil { err = fmt.Errorf("cannot open file %s", filename) - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } size := int64(-1) @@ -104,7 +105,7 @@ func ReadFastSeqBatchFromFile(filename string, options ...WithOption) (obiseq.IB size = -1 } - newIter := obiseq.MakeIBioSequenceBatch(opt.BufferSize()) + newIter := obiiter.MakeIBioSequenceBatch(opt.BufferSize()) newIter.Add(1) go func() { @@ -124,14 +125,14 @@ func ReadFastSeqBatchFromFile(filename string, options ...WithOption) (obiseq.IB return newIter, err } -func ReadFastSeqFromFile(filename string, options ...WithOption) (obiseq.IBioSequence, error) { +func ReadFastSeqFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { ib, err := ReadFastSeqBatchFromFile(filename, options...) return ib.SortBatches().IBioSequence(), err } -func ReadFastSeqBatchFromStdin(options ...WithOption) obiseq.IBioSequenceBatch { +func ReadFastSeqBatchFromStdin(options ...WithOption) obiiter.IBioSequenceBatch { opt := MakeOptions(options) - newIter := obiseq.MakeIBioSequenceBatch(opt.BufferSize()) + newIter := obiiter.MakeIBioSequenceBatch(opt.BufferSize()) newIter.Add(1) @@ -139,12 +140,13 @@ func ReadFastSeqBatchFromStdin(options ...WithOption) obiseq.IBioSequenceBatch { newIter.WaitAndClose() }() - go _FastseqReader(C.open_fast_sek_stdin(C.int32_t(opt.QualityShift())), newIter, opt.BatchSize()) + go _FastseqReader(C.open_fast_sek_stdin(C.int32_t(opt.QualityShift())), + newIter, opt.BatchSize()) return newIter } -func ReadFastSeqFromStdin(options ...WithOption) obiseq.IBioSequence { +func ReadFastSeqFromStdin(options ...WithOption) obiiter.IBioSequence { ib := ReadFastSeqBatchFromStdin(options...) return ib.SortBatches().IBioSequence() } diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index d51fb10..f9eb602 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -8,6 +8,7 @@ import ( "os" "strings" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -21,7 +22,7 @@ func min(x, y int) int { func FormatFasta(seq *obiseq.BioSequence, formater FormatHeader) string { var fragments strings.Builder - if seq==nil { + if seq == nil { log.Panicln("try to format a nil BioSequence") } @@ -44,7 +45,7 @@ func FormatFasta(seq *obiseq.BioSequence, formater FormatHeader) string { folded) } -func FormatFastaBatch(batch obiseq.BioSequenceBatch, formater FormatHeader) []byte { +func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader) []byte { var bs bytes.Buffer for _, seq := range batch.Slice() { bs.WriteString(FormatFasta(seq, formater)) @@ -53,7 +54,7 @@ func FormatFastaBatch(batch obiseq.BioSequenceBatch, formater FormatHeader) []by return bs.Bytes() } -func WriteFasta(iterator obiseq.IBioSequence, file io.Writer, options ...WithOption) error { +func WriteFasta(iterator obiiter.IBioSequence, file io.Writer, options ...WithOption) error { opt := MakeOptions(options) header_format := opt.FormatFastSeqHeader() @@ -73,7 +74,7 @@ func WriteFasta(iterator obiseq.IBioSequence, file io.Writer, options ...WithOpt return nil } -func WriteFastaToFile(iterator obiseq.IBioSequence, +func WriteFastaToFile(iterator obiiter.IBioSequence, filename string, options ...WithOption) error { @@ -89,16 +90,18 @@ func WriteFastaToFile(iterator obiseq.IBioSequence, return WriteFasta(iterator, file, options...) } -func WriteFastaToStdout(iterator obiseq.IBioSequence, options ...WithOption) error { +func WriteFastaToStdout(iterator obiiter.IBioSequence, options ...WithOption) error { options = append(options, OptionDontCloseFile()) return WriteFasta(iterator, os.Stdout, options...) } -func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) (obiseq.IBioSequenceBatch, error) { +func WriteFastaBatch(iterator obiiter.IBioSequenceBatch, + file io.Writer, + options ...WithOption) (obiiter.IBioSequenceBatch, error) { opt := MakeOptions(options) buffsize := iterator.BufferSize() - newIter := obiseq.MakeIBioSequenceBatch(buffsize) + newIter := obiiter.MakeIBioSequenceBatch(buffsize) nwriters := opt.ParallelWorkers() @@ -113,7 +116,7 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options close(chunkchan) }() - ff := func(iterator obiseq.IBioSequenceBatch) { + ff := func(iterator obiiter.IBioSequenceBatch) { for iterator.Next() { batch := iterator.Get() chunkchan <- FileChunck{ @@ -164,20 +167,21 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options return newIter, nil } -func WriteFastaBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) { +func WriteFastaBatchToStdout(iterator obiiter.IBioSequenceBatch, + options ...WithOption) (obiiter.IBioSequenceBatch, error) { options = append(options, OptionDontCloseFile()) return WriteFastaBatch(iterator, os.Stdout, options...) } -func WriteFastaBatchToFile(iterator obiseq.IBioSequenceBatch, +func WriteFastaBatchToFile(iterator obiiter.IBioSequenceBatch, filename string, - options ...WithOption) (obiseq.IBioSequenceBatch, error) { + options ...WithOption) (obiiter.IBioSequenceBatch, error) { file, err := os.Create(filename) if err != nil { log.Fatalf("open file error: %v", err) - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } options = append(options, OptionCloseFile()) diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index cd9594a..9520a18 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -8,6 +8,7 @@ import ( "os" "time" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -34,7 +35,7 @@ func FormatFastq(seq *obiseq.BioSequence, quality_shift int, formater FormatHead ) } -func FormatFastqBatch(batch obiseq.BioSequenceBatch, quality_shift int, +func FormatFastqBatch(batch obiiter.BioSequenceBatch, quality_shift int, formater FormatHeader) []byte { var bs bytes.Buffer for _, seq := range batch.Slice() { @@ -44,7 +45,7 @@ func FormatFastqBatch(batch obiseq.BioSequenceBatch, quality_shift int, return bs.Bytes() } -func WriteFastq(iterator obiseq.IBioSequence, file io.Writer, options ...WithOption) error { +func WriteFastq(iterator obiiter.IBioSequence, file io.Writer, options ...WithOption) error { opt := MakeOptions(options) header_format := opt.FormatFastSeqHeader() @@ -65,7 +66,7 @@ func WriteFastq(iterator obiseq.IBioSequence, file io.Writer, options ...WithOpt return nil } -func WriteFastqToFile(iterator obiseq.IBioSequence, +func WriteFastqToFile(iterator obiiter.IBioSequence, filename string, options ...WithOption) error { @@ -80,7 +81,7 @@ func WriteFastqToFile(iterator obiseq.IBioSequence, return WriteFastq(iterator, file, options...) } -func WriteFastqToStdout(iterator obiseq.IBioSequence, options ...WithOption) error { +func WriteFastqToStdout(iterator obiiter.IBioSequence, options ...WithOption) error { options = append(options, OptionDontCloseFile()) return WriteFastq(iterator, os.Stdout, options...) } @@ -90,11 +91,13 @@ type FileChunck struct { order int } -func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) (obiseq.IBioSequenceBatch, error) { +func WriteFastqBatch(iterator obiiter.IBioSequenceBatch, + file io.Writer, + options ...WithOption) (obiiter.IBioSequenceBatch, error) { opt := MakeOptions(options) buffsize := iterator.BufferSize() - newIter := obiseq.MakeIBioSequenceBatch(buffsize) + newIter := obiiter.MakeIBioSequenceBatch(buffsize) nwriters := opt.ParallelWorkers() @@ -113,7 +116,7 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options close(chunkchan) }() - ff := func(iterator obiseq.IBioSequenceBatch) { + ff := func(iterator obiiter.IBioSequenceBatch) { for iterator.Next() { batch := iterator.Get() chunk := FileChunck{ @@ -165,20 +168,21 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options return newIter, nil } -func WriteFastqBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) { +func WriteFastqBatchToStdout(iterator obiiter.IBioSequenceBatch, + options ...WithOption) (obiiter.IBioSequenceBatch, error) { options = append(options, OptionDontCloseFile()) return WriteFastqBatch(iterator, os.Stdout, options...) } -func WriteFastqBatchToFile(iterator obiseq.IBioSequenceBatch, +func WriteFastqBatchToFile(iterator obiiter.IBioSequenceBatch, filename string, - options ...WithOption) (obiseq.IBioSequenceBatch, error) { + options ...WithOption) (obiiter.IBioSequenceBatch, error) { file, err := os.Create(filename) if err != nil { log.Fatalf("open file error: %v", err) - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } options = append(options, OptionCloseFile()) diff --git a/pkg/obiformats/options.go b/pkg/obiformats/options.go index 9790a0c..19cc540 100644 --- a/pkg/obiformats/options.go +++ b/pkg/obiformats/options.go @@ -1,11 +1,12 @@ package obiformats import ( + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) type __options__ struct { - fastseq_header_parser obiseq.SeqAnnotator + fastseq_header_parser obiiter.SeqAnnotator fastseq_header_writer func(*obiseq.BioSequence) string with_progress_bar bool buffer_size int @@ -58,7 +59,7 @@ func (opt Options) ParallelWorkers() int { return opt.pointer.parallel_workers } -func (opt Options) ParseFastSeqHeader() obiseq.SeqAnnotator { +func (opt Options) ParseFastSeqHeader() obiiter.SeqAnnotator { return opt.pointer.fastseq_header_parser } @@ -123,7 +124,7 @@ func OptionsQualitySolexa() WithOption { return OptionsQualityShift(64) } -func OptionsFastSeqHeaderParser(parser obiseq.SeqAnnotator) WithOption { +func OptionsFastSeqHeaderParser(parser obiiter.SeqAnnotator) WithOption { f := WithOption(func(opt Options) { opt.pointer.fastseq_header_parser = parser }) diff --git a/pkg/obiformats/universal_read.go b/pkg/obiformats/universal_read.go index e068dc6..0679e9f 100644 --- a/pkg/obiformats/universal_read.go +++ b/pkg/obiformats/universal_read.go @@ -8,7 +8,7 @@ import ( "os" "strings" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" ) func GuessSeqFileType(firstline string) string { @@ -36,7 +36,8 @@ func GuessSeqFileType(firstline string) string { } } -func ReadSequencesBatchFromFile(filename string, options ...WithOption) (obiseq.IBioSequenceBatch, error) { +func ReadSequencesBatchFromFile(filename string, + options ...WithOption) (obiiter.IBioSequenceBatch, error) { var file *os.File var reader io.Reader var greader io.Reader @@ -46,7 +47,7 @@ func ReadSequencesBatchFromFile(filename string, options ...WithOption) (obiseq. if err != nil { log.Fatalf("open file error: %v", err) - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } reader = file @@ -65,7 +66,7 @@ func ReadSequencesBatchFromFile(filename string, options ...WithOption) (obiseq. tag, _ := breader.Peek(30) if len(tag) < 30 { - newIter := obiseq.MakeIBioSequenceBatch() + newIter := obiiter.MakeIBioSequenceBatch() newIter.Close() return newIter, nil } @@ -89,10 +90,11 @@ func ReadSequencesBatchFromFile(filename string, options ...WithOption) (obiseq. filename, filetype) } - return obiseq.NilIBioSequenceBatch, nil + return obiiter.NilIBioSequenceBatch, nil } -func ReadSequencesFromFile(filename string, options ...WithOption) (obiseq.IBioSequence, error) { +func ReadSequencesFromFile(filename string, + options ...WithOption) (obiiter.IBioSequence, error) { ib, err := ReadSequencesBatchFromFile(filename, options...) return ib.SortBatches().IBioSequence(), err diff --git a/pkg/obiformats/universal_write.go b/pkg/obiformats/universal_write.go index 109bfaa..310444f 100644 --- a/pkg/obiformats/universal_write.go +++ b/pkg/obiformats/universal_write.go @@ -6,10 +6,10 @@ import ( "log" "os" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" ) -func WriteSequences(iterator obiseq.IBioSequence, +func WriteSequences(iterator obiiter.IBioSequence, file io.Writer, options ...WithOption) error { @@ -34,7 +34,7 @@ func WriteSequences(iterator obiseq.IBioSequence, return nil } -func WriteSequencesToFile(iterator obiseq.IBioSequence, +func WriteSequencesToFile(iterator obiiter.IBioSequence, filename string, options ...WithOption) error { @@ -48,13 +48,13 @@ func WriteSequencesToFile(iterator obiseq.IBioSequence, return WriteSequences(iterator, file, options...) } -func WriteSequencesToStdout(iterator obiseq.IBioSequence, options ...WithOption) error { +func WriteSequencesToStdout(iterator obiiter.IBioSequence, options ...WithOption) error { return WriteSequences(iterator, os.Stdout, options...) } -func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch, +func WriteSequenceBatch(iterator obiiter.IBioSequenceBatch, file io.Writer, - options ...WithOption) (obiseq.IBioSequenceBatch, error) { + options ...WithOption) (obiiter.IBioSequenceBatch, error) { iterator = iterator.Rebatch(1000) @@ -64,7 +64,7 @@ func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch, batch := iterator.Get() iterator.PushBack() - var newIter obiseq.IBioSequenceBatch + var newIter obiiter.IBioSequenceBatch var err error if len(batch.Slice()) > 0 { @@ -84,24 +84,24 @@ func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch, return iterator, nil } - return obiseq.NilIBioSequenceBatch, fmt.Errorf("input iterator not ready") + return obiiter.NilIBioSequenceBatch, fmt.Errorf("input iterator not ready") } -func WriteSequencesBatchToStdout(iterator obiseq.IBioSequenceBatch, - options ...WithOption) (obiseq.IBioSequenceBatch, error) { +func WriteSequencesBatchToStdout(iterator obiiter.IBioSequenceBatch, + options ...WithOption) (obiiter.IBioSequenceBatch, error) { options = append(options, OptionDontCloseFile()) return WriteSequenceBatch(iterator, os.Stdout, options...) } -func WriteSequencesBatchToFile(iterator obiseq.IBioSequenceBatch, +func WriteSequencesBatchToFile(iterator obiiter.IBioSequenceBatch, filename string, - options ...WithOption) (obiseq.IBioSequenceBatch, error) { + options ...WithOption) (obiiter.IBioSequenceBatch, error) { file, err := os.Create(filename) if err != nil { log.Fatalf("open file error: %v", err) - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } options = append(options, OptionCloseFile()) diff --git a/pkg/obiiter/batch.go b/pkg/obiiter/batch.go new file mode 100644 index 0000000..014a6df --- /dev/null +++ b/pkg/obiiter/batch.go @@ -0,0 +1,53 @@ +package obiiter + +import "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + +type BioSequenceBatch struct { + slice obiseq.BioSequenceSlice + order int +} + +var NilBioSequenceBatch = BioSequenceBatch{nil, -1} + +func MakeBioSequenceBatch(order int, + sequences obiseq.BioSequenceSlice) BioSequenceBatch { + + return BioSequenceBatch{ + slice: sequences, + order: order, + } +} + +func (batch BioSequenceBatch) Order() int { + return batch.order +} + +func (batch BioSequenceBatch) Reorder(newOrder int) BioSequenceBatch { + batch.order = newOrder + return batch +} + +func (batch BioSequenceBatch) Slice() obiseq.BioSequenceSlice { + return batch.slice +} + +func (batch BioSequenceBatch) Length() int { + return len(batch.slice) +} + +func (batch BioSequenceBatch) NotEmpty() bool { + return batch.slice.NotEmpty() +} + +func (batch BioSequenceBatch) Pop0() *obiseq.BioSequence { + return batch.slice.Pop0() +} + +func (batch BioSequenceBatch) IsNil() bool { + return batch.slice == nil +} + +func (batch BioSequenceBatch) Recycle() { + batch.slice.Recycle() + batch.slice = nil +} diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go new file mode 100644 index 0000000..b5d1d9f --- /dev/null +++ b/pkg/obiiter/batchiterator.go @@ -0,0 +1,560 @@ +package obiiter + +import ( + "fmt" + "log" + "sync" + "sync/atomic" + "time" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "github.com/tevino/abool/v2" +) + +// Structure implementing an iterator over bioseq.BioSequenceBatch +// based on a channel. +type _IBioSequenceBatch struct { + channel chan BioSequenceBatch + current BioSequenceBatch + pushBack *abool.AtomicBool + all_done *sync.WaitGroup + lock *sync.RWMutex + buffer_size int32 + batch_size int32 + sequence_format string + finished *abool.AtomicBool +} + +type IBioSequenceBatch struct { + pointer *_IBioSequenceBatch +} + +// NilIBioSequenceBatch nil instance for IBioSequenceBatch +// +// NilIBioSequenceBatch is the nil instance for the +// IBioSequenceBatch type. +// +var NilIBioSequenceBatch = IBioSequenceBatch{pointer: nil} + +func MakeIBioSequenceBatch(sizes ...int) IBioSequenceBatch { + buffsize := int32(1) + + if len(sizes) > 0 { + buffsize = int32(sizes[0]) + } + + i := _IBioSequenceBatch{ + channel: make(chan BioSequenceBatch, buffsize), + current: NilBioSequenceBatch, + pushBack: abool.New(), + buffer_size: buffsize, + batch_size: -1, + sequence_format: "", + finished: abool.New(), + } + + waiting := sync.WaitGroup{} + i.all_done = &waiting + lock := sync.RWMutex{} + i.lock = &lock + ii := IBioSequenceBatch{&i} + return ii +} + +func (iterator IBioSequenceBatch) Add(n int) { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.Add method on NilIBioSequenceBatch") + } + + iterator.pointer.all_done.Add(n) +} + +func (iterator IBioSequenceBatch) Done() { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.Done method on NilIBioSequenceBatch") + } + + iterator.pointer.all_done.Done() +} + +func (iterator IBioSequenceBatch) Unlock() { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.Unlock method on NilIBioSequenceBatch") + } + + iterator.pointer.lock.Unlock() +} + +func (iterator IBioSequenceBatch) Lock() { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.Lock method on NilIBioSequenceBatch") + } + + iterator.pointer.lock.Lock() +} + +func (iterator IBioSequenceBatch) RLock() { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.RLock method on NilIBioSequenceBatch") + } + + iterator.pointer.lock.RLock() +} + +func (iterator IBioSequenceBatch) RUnlock() { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.RUnlock method on NilIBioSequenceBatch") + } + + iterator.pointer.lock.RUnlock() +} + +func (iterator IBioSequenceBatch) Wait() { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.Wait method on NilIBioSequenceBatch") + } + + iterator.pointer.all_done.Wait() +} + +func (iterator IBioSequenceBatch) Channel() chan BioSequenceBatch { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.Channel method on NilIBioSequenceBatch") + } + + return iterator.pointer.channel +} + +func (iterator IBioSequenceBatch) IsNil() bool { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.IsNil method on NilIBioSequenceBatch") + } + + return iterator.pointer == nil +} + +func (iterator IBioSequenceBatch) BufferSize() int { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.BufferSize method on NilIBioSequenceBatch") + } + + return int(atomic.LoadInt32(&iterator.pointer.buffer_size)) +} + +func (iterator IBioSequenceBatch) BatchSize() int { + if iterator.pointer == nil { + log.Panic("call of IBioSequenceBatch.BatchSize method on NilIBioSequenceBatch") + } + + return int(atomic.LoadInt32(&iterator.pointer.batch_size)) +} + +func (iterator IBioSequenceBatch) SetBatchSize(size int) error { + if size >= 0 { + atomic.StoreInt32(&iterator.pointer.batch_size, int32(size)) + return nil + } + + return fmt.Errorf("size (%d) cannot be negative", size) +} + +func (iterator IBioSequenceBatch) Split() IBioSequenceBatch { + iterator.pointer.lock.RLock() + defer iterator.pointer.lock.RUnlock() + i := _IBioSequenceBatch{ + channel: iterator.pointer.channel, + current: NilBioSequenceBatch, + pushBack: abool.New(), + all_done: iterator.pointer.all_done, + buffer_size: iterator.pointer.buffer_size, + batch_size: iterator.pointer.batch_size, + sequence_format: iterator.pointer.sequence_format, + finished: iterator.pointer.finished} + lock := sync.RWMutex{} + i.lock = &lock + + newIter := IBioSequenceBatch{&i} + return newIter +} + +func (iterator IBioSequenceBatch) Next() bool { + if iterator.pointer.pushBack.IsSet() { + iterator.pointer.pushBack.UnSet() + return true + } + + if iterator.pointer.finished.IsSet() { + return false + } + + next, ok := (<-iterator.pointer.channel) + + if ok { + iterator.pointer.current = next + return true + } + + iterator.pointer.current = NilBioSequenceBatch + iterator.pointer.finished.Set() + return false +} + +func (iterator IBioSequenceBatch) PushBack() { + if !iterator.pointer.current.IsNil() { + iterator.pointer.pushBack.Set() + } +} + +// The 'Get' method returns the instance of BioSequenceBatch +// currently pointed by the iterator. You have to use the +// 'Next' method to move to the next entry before calling +// 'Get' to retreive the following instance. +func (iterator IBioSequenceBatch) Get() BioSequenceBatch { + return iterator.pointer.current +} + +func (iterator IBioSequenceBatch) Push(batch BioSequenceBatch) { + if batch.IsNil() { + log.Panicln("An Nil batch is pushed on the channel") + } + if batch.Length() == 0 { + log.Panicln("An empty batch is pushed on the channel") + } + + iterator.pointer.channel <- batch +} + +func (iterator IBioSequenceBatch) Close() { + close(iterator.pointer.channel) +} + +func (iterator IBioSequenceBatch) WaitAndClose() { + iterator.Wait() + + for len(iterator.Channel()) > 0 { + time.Sleep(time.Millisecond) + } + iterator.Close() +} + +// Finished returns 'true' value if no more data is available +// from the iterator. +func (iterator IBioSequenceBatch) Finished() bool { + return iterator.pointer.finished.IsSet() +} + +func (iterator IBioSequenceBatch) IBioSequence(sizes ...int) IBioSequence { + buffsize := iterator.BufferSize() + + if len(sizes) > 0 { + buffsize = sizes[0] + } + + newIter := MakeIBioSequence(buffsize) + + newIter.Add(1) + + go func() { + newIter.Wait() + close(newIter.Channel()) + }() + + go func() { + for iterator.Next() { + batch := iterator.Get() + + for batch.NotEmpty() { + newIter.Channel() <- batch.Pop0() + } + batch.Recycle() + } + newIter.Done() + }() + + return newIter +} + +func (iterator IBioSequenceBatch) SortBatches(sizes ...int) IBioSequenceBatch { + buffsize := iterator.BufferSize() + + if len(sizes) > 0 { + buffsize = sizes[0] + } + + newIter := MakeIBioSequenceBatch(buffsize) + + newIter.Add(1) + + go func() { + newIter.Wait() + close(newIter.pointer.channel) + }() + + next_to_send := 0 + received := make(map[int]BioSequenceBatch) + go func() { + for iterator.Next() { + batch := iterator.Get() + if batch.order == next_to_send { + newIter.pointer.channel <- batch + next_to_send++ + batch, ok := received[next_to_send] + for ok { + newIter.pointer.channel <- batch + delete(received, next_to_send) + next_to_send++ + batch, ok = received[next_to_send] + } + } else { + received[batch.order] = batch + } + } + newIter.Done() + }() + + return newIter + +} + +func (iterator IBioSequenceBatch) Concat(iterators ...IBioSequenceBatch) IBioSequenceBatch { + + if len(iterators) == 0 { + return iterator + } + + buffsize := iterator.BufferSize() + newIter := MakeIBioSequenceBatch(buffsize) + + newIter.Add(1) + + go func() { + newIter.Wait() + close(newIter.Channel()) + }() + + go func() { + previous_max := 0 + max_order := 0 + + for iterator.Next() { + s := iterator.Get() + if s.order > max_order { + max_order = s.order + } + newIter.Push(s.Reorder(s.order + previous_max)) + } + + previous_max = max_order + 1 + for _, iter := range iterators { + for iter.Next() { + s := iter.Get() + if (s.order + previous_max) > max_order { + max_order = s.order + previous_max + } + + newIter.Push(s.Reorder(s.order + previous_max)) + } + previous_max = max_order + 1 + } + newIter.Done() + }() + + return newIter +} + +// Redistributes sequences from a IBioSequenceBatch into a new +// IBioSequenceBatch with every batches having the same size +// indicated in parameter. Rebatching implies to sort the +// source IBioSequenceBatch. +func (iterator IBioSequenceBatch) Rebatch(size int, sizes ...int) IBioSequenceBatch { + buffsize := iterator.BufferSize() + + if len(sizes) > 0 { + buffsize = sizes[0] + } + + newIter := MakeIBioSequenceBatch(buffsize) + + newIter.Add(1) + + go func() { + newIter.Wait() + close(newIter.pointer.channel) + }() + + go func() { + order := 0 + iterator = iterator.SortBatches() + buffer := obiseq.MakeBioSequenceSlice() + + for iterator.Next() { + seqs := iterator.Get() + for _, s := range seqs.slice { + buffer = append(buffer, s) + if len(buffer) == size { + newIter.Push(MakeBioSequenceBatch(order, buffer)) + order++ + buffer = obiseq.MakeBioSequenceSlice() + } + } + seqs.Recycle() + } + + if len(buffer) > 0 { + newIter.Push(MakeBioSequenceBatch(order, buffer)) + } + + newIter.Done() + + }() + + return newIter +} + +func (iterator IBioSequenceBatch) Recycle() { + + log.Println("Start recycling of Bioseq objects") + recycled := 0 + for iterator.Next() { + // iterator.Get() + batch := iterator.Get() + for _, seq := range batch.Slice() { + seq.Recycle() + recycled++ + } + batch.Recycle() + } + log.Printf("End of the recycling of %d Bioseq objects", recycled) +} + +func (iterator IBioSequenceBatch) Count(recycle bool) (int, int, int) { + variants := 0 + reads := 0 + nucleotides := 0 + + log.Println("Start counting of Bioseq objects") + for iterator.Next() { + // iterator.Get() + batch := iterator.Get() + for _, seq := range batch.Slice() { + variants++ + reads += seq.Count() + nucleotides += seq.Length() + + if recycle { + seq.Recycle() + } + } + batch.Recycle() + } + log.Printf("End of the counting of %d Bioseq objects", variants) + return variants, reads, nucleotides +} + +func (iterator IBioSequenceBatch) PairWith(reverse IBioSequenceBatch, + sizes ...int) IPairedBioSequenceBatch { + buffsize := iterator.BufferSize() + batchsize := 5000 + + if len(sizes) > 0 { + batchsize = sizes[0] + } + + if len(sizes) > 1 { + buffsize = sizes[1] + } + + iterator = iterator.Rebatch(batchsize) + reverse = reverse.Rebatch(batchsize) + + newIter := MakeIPairedBioSequenceBatch(buffsize) + + newIter.Add(1) + + go func() { + newIter.Wait() + close(newIter.Channel()) + log.Println("End of association of paired reads") + }() + + log.Println("Start association of paired reads") + go func() { + for iterator.Next() { + if !reverse.Next() { + log.Panicln("Etrange reverse pas prĂȘt") + } + newIter.Channel() <- MakePairedBioSequenceBatch(iterator.Get(), + reverse.Get()) + } + + newIter.Done() + }() + + return newIter +} + +func (iterator IBioSequenceBatch) DivideOn(predicate obiseq.SequencePredicate, + size int, sizes ...int) (IBioSequenceBatch, IBioSequenceBatch) { + buffsize := iterator.BufferSize() + + if len(sizes) > 0 { + buffsize = sizes[0] + } + + trueIter := MakeIBioSequenceBatch(buffsize) + falseIter := MakeIBioSequenceBatch(buffsize) + + trueIter.Add(1) + falseIter.Add(1) + + go func() { + trueIter.WaitAndClose() + falseIter.WaitAndClose() + }() + + go func() { + trueOrder := 0 + falseOrder := 0 + iterator = iterator.SortBatches() + + trueSlice := obiseq.MakeBioSequenceSlice() + falseSlice := obiseq.MakeBioSequenceSlice() + + for iterator.Next() { + seqs := iterator.Get() + for _, s := range seqs.slice { + if predicate(s) { + trueSlice = append(trueSlice, s) + } else { + falseSlice = append(falseSlice, s) + } + + if len(trueSlice) == size { + trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice)) + trueOrder++ + trueSlice = obiseq.MakeBioSequenceSlice() + } + + if len(falseSlice) == size { + falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice)) + falseOrder++ + falseSlice = obiseq.MakeBioSequenceSlice() + } + } + seqs.Recycle() + } + + if len(trueSlice) > 0 { + trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice)) + } + + if len(falseSlice) > 0 { + falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice)) + } + + trueIter.Done() + falseIter.Done() + }() + + return trueIter, falseIter +} diff --git a/pkg/obiseq/distribute.go b/pkg/obiiter/distribute.go similarity index 79% rename from pkg/obiseq/distribute.go rename to pkg/obiiter/distribute.go index e51a971..ba2f496 100644 --- a/pkg/obiseq/distribute.go +++ b/pkg/obiiter/distribute.go @@ -1,14 +1,16 @@ -package obiseq +package obiiter import ( "fmt" "sync" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) type IDistribute struct { outputs map[int]IBioSequenceBatch news chan int - classifier *BioSequenceClassifier + classifier *obiseq.BioSequenceClassifier lock *sync.Mutex } @@ -28,16 +30,16 @@ func (dist *IDistribute) News() chan int { return dist.news } -func (dist *IDistribute) Classifier() *BioSequenceClassifier { +func (dist *IDistribute) Classifier() *obiseq.BioSequenceClassifier { return dist.classifier } -func (iterator IBioSequenceBatch) Distribute(class *BioSequenceClassifier, sizes ...int) IDistribute { +func (iterator IBioSequenceBatch) Distribute(class *obiseq.BioSequenceClassifier, sizes ...int) IDistribute { batchsize := 5000 buffsize := 2 outputs := make(map[int]IBioSequenceBatch, 100) - slices := make(map[int]*BioSequenceSlice, 100) + slices := make(map[int]*obiseq.BioSequenceSlice, 100) orders := make(map[int]int, 100) news := make(chan int) @@ -72,7 +74,7 @@ func (iterator IBioSequenceBatch) Distribute(class *BioSequenceClassifier, sizes slice, ok := slices[key] if !ok { - s := MakeBioSequenceSlice() + s := obiseq.MakeBioSequenceSlice() slice = &s slices[key] = slice orders[key] = 0 @@ -89,7 +91,7 @@ func (iterator IBioSequenceBatch) Distribute(class *BioSequenceClassifier, sizes if len(*slice) == batchsize { outputs[key].Push(MakeBioSequenceBatch(orders[key], *slice)) orders[key]++ - s := MakeBioSequenceSlice() + s := obiseq.MakeBioSequenceSlice() slices[key] = &s } } diff --git a/pkg/obiseq/iterator.go b/pkg/obiiter/iterator.go similarity index 92% rename from pkg/obiseq/iterator.go rename to pkg/obiiter/iterator.go index 19e23b4..65b629e 100644 --- a/pkg/obiseq/iterator.go +++ b/pkg/obiiter/iterator.go @@ -1,14 +1,16 @@ -package obiseq +package obiiter import ( "sync" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) // Private structure implementing an iterator over // bioseq.BioSequence based on a channel. type __ibiosequence__ struct { - channel chan *BioSequence - current *BioSequence + channel chan *obiseq.BioSequence + current *obiseq.BioSequence pushBack bool all_done *sync.WaitGroup buffer_size int @@ -38,10 +40,10 @@ func (iterator IBioSequence) Wait() { iterator.pointer.all_done.Wait() } -func (iterator IBioSequence) Channel() chan *BioSequence { +func (iterator IBioSequence) Channel() chan *obiseq.BioSequence { return iterator.pointer.channel } -func (iterator IBioSequence) PChannel() *chan *BioSequence { +func (iterator IBioSequence) PChannel() *chan *obiseq.BioSequence { return &(iterator.pointer.channel) } @@ -53,7 +55,7 @@ func MakeIBioSequence(sizes ...int) IBioSequence { } i := __ibiosequence__{ - channel: make(chan *BioSequence, buffsize), + channel: make(chan *obiseq.BioSequence, buffsize), current: nil, pushBack: false, buffer_size: buffsize, @@ -117,7 +119,7 @@ func (iterator IBioSequence) PushBack() { // currently pointed by the iterator. You have to use the // 'Next' method to move to the next entry before calling // 'Get' to retreive the following instance. -func (iterator IBioSequence) Get() *BioSequence { +func (iterator IBioSequence) Get() *obiseq.BioSequence { return iterator.pointer.current } @@ -161,7 +163,7 @@ func (iterator IBioSequence) IBioSequenceBatch(sizes ...int) IBioSequenceBatch { go func() { for j := 0; !iterator.Finished(); j++ { batch := BioSequenceBatch{ - slice: MakeBioSequenceSlice(), + slice: obiseq.MakeBioSequenceSlice(), order: j} for i := 0; i < batchsize && iterator.Next(); i++ { seq := iterator.Get() @@ -275,7 +277,7 @@ func (iterator IBioSequence) Tail(n int, sizes ...int) IBioSequence { } newIter := MakeIBioSequence(buffsize) - buffseq := MakeBioSequenceSlice() + buffseq := obiseq.MakeBioSequenceSlice() newIter.Add(1) diff --git a/pkg/obiiter/merge.go b/pkg/obiiter/merge.go new file mode 100644 index 0000000..1e91104 --- /dev/null +++ b/pkg/obiiter/merge.go @@ -0,0 +1,50 @@ +package obiiter + +import "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + +func (iterator IBioSequenceBatch) IMergeSequenceBatch(na string, statsOn []string, sizes ...int) IBioSequenceBatch { + batchsize := 100 + buffsize := iterator.BufferSize() + + if len(sizes) > 0 { + batchsize = sizes[0] + } + if len(sizes) > 1 { + buffsize = sizes[1] + } + + newIter := MakeIBioSequenceBatch(buffsize) + + newIter.Add(1) + + go func() { + newIter.WaitAndClose() + }() + + go func() { + for j := 0; !iterator.Finished(); j++ { + batch := BioSequenceBatch{ + slice: obiseq.MakeBioSequenceSlice(), + order: j} + for i := 0; i < batchsize && iterator.Next(); i++ { + seqs := iterator.Get() + batch.slice = append(batch.slice, seqs.slice.Merge(na, statsOn)) + } + if batch.Length() > 0 { + newIter.Push(batch) + } + } + newIter.Done() + }() + + return newIter +} + + +func MergePipe(na string, statsOn []string, sizes ...int) Pipeable { + f := func(iterator IBioSequenceBatch) IBioSequenceBatch { + return iterator.IMergeSequenceBatch(na,statsOn,sizes...) + } + + return f +} \ No newline at end of file diff --git a/pkg/obiiter/pairedbatchiterator.go b/pkg/obiiter/pairedbatchiterator.go new file mode 100644 index 0000000..8222264 --- /dev/null +++ b/pkg/obiiter/pairedbatchiterator.go @@ -0,0 +1,221 @@ +package obiiter + +import ( + "log" + "sync" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + +type PairedBioSequenceBatch struct { + forward obiseq.BioSequenceSlice + reverse obiseq.BioSequenceSlice + order int +} + +var NilPairedBioSequenceBatch = PairedBioSequenceBatch{nil, nil, -1} + +func MakePairedBioSequenceBatch(forward, reverse BioSequenceBatch) PairedBioSequenceBatch { + if forward.order != reverse.order { + log.Fatalf("Forward order : %d and reverse order : %d are not matching", + forward.order, reverse.order) + } + + for i := range reverse.slice { + reverse.slice[i].ReverseComplement(true) + } + + return PairedBioSequenceBatch{ + forward: forward.slice, + reverse: reverse.slice, + order: forward.order, + } +} + +func (batch PairedBioSequenceBatch) Order() int { + return batch.order +} + +func (batch PairedBioSequenceBatch) Reorder(newOrder int) PairedBioSequenceBatch { + batch.order = newOrder + return batch +} + + +func (batch PairedBioSequenceBatch) Length() int { + return len(batch.forward) +} + +func (batch PairedBioSequenceBatch) Forward() obiseq.BioSequenceSlice { + return batch.forward +} + +func (batch PairedBioSequenceBatch) Reverse() obiseq.BioSequenceSlice { + return batch.reverse +} + +func (batch PairedBioSequenceBatch) IsNil() bool { + return batch.forward == nil +} + +// Structure implementing an iterator over bioseq.BioSequenceBatch +// based on a channel. +type __ipairedbiosequencebatch__ struct { + channel chan PairedBioSequenceBatch + current PairedBioSequenceBatch + pushBack bool + all_done *sync.WaitGroup + buffer_size int + finished bool + p_finished *bool +} + +type IPairedBioSequenceBatch struct { + pointer *__ipairedbiosequencebatch__ +} + +var NilIPairedBioSequenceBatch = IPairedBioSequenceBatch{pointer: nil} + +func MakeIPairedBioSequenceBatch(sizes ...int) IPairedBioSequenceBatch { + buffsize := 1 + + if len(sizes) > 0 { + buffsize = sizes[0] + } + + i := __ipairedbiosequencebatch__{ + channel: make(chan PairedBioSequenceBatch, buffsize), + current: NilPairedBioSequenceBatch, + pushBack: false, + buffer_size: buffsize, + finished: false, + p_finished: nil, + } + + i.p_finished = &i.finished + waiting := sync.WaitGroup{} + i.all_done = &waiting + ii := IPairedBioSequenceBatch{&i} + return ii +} + +func (iterator IPairedBioSequenceBatch) Add(n int) { + iterator.pointer.all_done.Add(n) +} + +func (iterator IPairedBioSequenceBatch) Done() { + iterator.pointer.all_done.Done() +} + +func (iterator IPairedBioSequenceBatch) Wait() { + iterator.pointer.all_done.Wait() +} + +func (iterator IPairedBioSequenceBatch) Channel() chan PairedBioSequenceBatch { + return iterator.pointer.channel +} + +func (iterator IPairedBioSequenceBatch) IsNil() bool { + return iterator.pointer == nil +} + +func (iterator IPairedBioSequenceBatch) BufferSize() int { + return iterator.pointer.buffer_size +} + +func (iterator IPairedBioSequenceBatch) Split() IPairedBioSequenceBatch { + i := __ipairedbiosequencebatch__{ + channel: iterator.pointer.channel, + current: NilPairedBioSequenceBatch, + pushBack: false, + all_done: iterator.pointer.all_done, + buffer_size: iterator.pointer.buffer_size, + finished: false, + p_finished: iterator.pointer.p_finished} + newIter := IPairedBioSequenceBatch{&i} + return newIter +} + +func (iterator IPairedBioSequenceBatch) Next() bool { + if *(iterator.pointer.p_finished) { + return false + } + + if iterator.pointer.pushBack { + iterator.pointer.pushBack = false + return true + } + + next, ok := (<-iterator.pointer.channel) + + if ok { + iterator.pointer.current = next + return true + } + + iterator.pointer.current = NilPairedBioSequenceBatch + *iterator.pointer.p_finished = true + return false +} + +func (iterator IPairedBioSequenceBatch) PushBack() { + if !iterator.pointer.current.IsNil() { + iterator.pointer.pushBack = true + } +} + +// The 'Get' method returns the instance of BioSequenceBatch +// currently pointed by the iterator. You have to use the +// 'Next' method to move to the next entry before calling +// 'Get' to retreive the following instance. +func (iterator IPairedBioSequenceBatch) Get() PairedBioSequenceBatch { + return iterator.pointer.current +} + +// Finished returns 'true' value if no more data is available +// from the iterator. +func (iterator IPairedBioSequenceBatch) Finished() bool { + return *iterator.pointer.p_finished +} + +func (iterator IPairedBioSequenceBatch) SortBatches(sizes ...int) IPairedBioSequenceBatch { + buffsize := iterator.BufferSize() + + if len(sizes) > 0 { + buffsize = sizes[0] + } + + newIter := MakeIPairedBioSequenceBatch(buffsize) + + newIter.Add(1) + + go func() { + newIter.Wait() + close(newIter.pointer.channel) + }() + + next_to_send := 0 + received := make(map[int]PairedBioSequenceBatch) + go func() { + for iterator.Next() { + batch := iterator.Get() + if batch.order == next_to_send { + newIter.pointer.channel <- batch + next_to_send++ + batch, ok := received[next_to_send] + for ok { + newIter.pointer.channel <- batch + delete(received, next_to_send) + next_to_send++ + batch, ok = received[next_to_send] + } + } else { + received[batch.order] = batch + } + } + newIter.Done() + }() + + return newIter + +} diff --git a/pkg/obiiter/pipe.go b/pkg/obiiter/pipe.go new file mode 100644 index 0000000..9efbfed --- /dev/null +++ b/pkg/obiiter/pipe.go @@ -0,0 +1,46 @@ +package obiiter + + +type Pipeable func(input IBioSequenceBatch) IBioSequenceBatch + +func Pipeline(start Pipeable,parts ...Pipeable) Pipeable { + p := func (input IBioSequenceBatch) IBioSequenceBatch { + data := start(input) + for _,part := range parts { + data = part(data) + } + return data + } + + return p +} + +func (input IBioSequenceBatch) Pipe(start Pipeable, parts ...Pipeable) IBioSequenceBatch { + p := Pipeline(start,parts...) + return p(input) +} + + +type Teeable func(input IBioSequenceBatch) (IBioSequenceBatch,IBioSequenceBatch) + +func (input IBioSequenceBatch) CopyTee() (IBioSequenceBatch,IBioSequenceBatch) { + first := MakeIBioSequenceBatch() + second:= MakeIBioSequenceBatch() + + first.Add(1) + + go func() { + first.WaitAndClose() + second.Close() + }() + + go func() { + for input.Next() { + b:=input.Get() + first.Push(b) + second.Push(b) + } + }() + + return first,second +} diff --git a/pkg/obiseq/speed.go b/pkg/obiiter/speed.go similarity index 81% rename from pkg/obiseq/speed.go rename to pkg/obiiter/speed.go index 1892525..3c9fd4b 100644 --- a/pkg/obiseq/speed.go +++ b/pkg/obiiter/speed.go @@ -1,4 +1,4 @@ -package obiseq +package obiiter import ( "os" @@ -37,3 +37,12 @@ func (iterator IBioSequenceBatch) Speed() IBioSequenceBatch { return newIter } + + +func SpeedPipe() Pipeable { + f := func(iterator IBioSequenceBatch) IBioSequenceBatch { + return iterator.Speed() + } + + return f +} \ No newline at end of file diff --git a/pkg/obiseq/workers.go b/pkg/obiiter/workers.go similarity index 75% rename from pkg/obiseq/workers.go rename to pkg/obiiter/workers.go index 2352b94..fa9fb4c 100644 --- a/pkg/obiseq/workers.go +++ b/pkg/obiiter/workers.go @@ -1,49 +1,25 @@ -package obiseq +package obiiter import ( "log" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) -type SeqAnnotator func(*BioSequence) +type SeqAnnotator func(*obiseq.BioSequence) -type SeqWorker func(*BioSequence) *BioSequence -type SeqSliceWorker func(BioSequenceSlice) BioSequenceSlice +type SeqWorker func(*obiseq.BioSequence) *obiseq.BioSequence +type SeqSliceWorker func(obiseq.BioSequenceSlice) obiseq.BioSequenceSlice func AnnotatorToSeqWorker(function SeqAnnotator) SeqWorker { - f := func(seq *BioSequence) *BioSequence { + f := func(seq *obiseq.BioSequence) *obiseq.BioSequence { function(seq) return seq } return f } -func (iterator IBioSequence) MakeIWorker(worker SeqWorker, sizes ...int) IBioSequence { - buffsize := iterator.BufferSize() - if len(sizes) > 0 { - buffsize = sizes[0] - } - - newIter := MakeIBioSequence(buffsize) - - newIter.Add(1) - - go func() { - newIter.Wait() - close(newIter.pointer.channel) - }() - - go func() { - for iterator.Next() { - seq := iterator.Get() - seq = worker(seq) - newIter.pointer.channel <- seq - } - newIter.Done() - }() - - return newIter -} func (iterator IBioSequenceBatch) MakeIWorker(worker SeqWorker, sizes ...int) IBioSequenceBatch { nworkers := 4 @@ -125,3 +101,48 @@ func (iterator IBioSequenceBatch) MakeISliceWorker(worker SeqSliceWorker, sizes return newIter } + + +func (iterator IBioSequence) MakeIWorker(worker SeqWorker, sizes ...int) IBioSequence { + buffsize := iterator.BufferSize() + + if len(sizes) > 0 { + buffsize = sizes[0] + } + + newIter := MakeIBioSequence(buffsize) + + newIter.Add(1) + + go func() { + newIter.Wait() + close(newIter.pointer.channel) + }() + + go func() { + for iterator.Next() { + seq := iterator.Get() + seq = worker(seq) + newIter.pointer.channel <- seq + } + newIter.Done() + }() + + return newIter +} + +func WorkerPipe(worker SeqWorker, sizes ...int) Pipeable { + f := func(iterator IBioSequenceBatch) IBioSequenceBatch { + return iterator.MakeIWorker(worker,sizes...) + } + + return f +} + +func SliceWorkerPipe(worker SeqSliceWorker, sizes ...int) Pipeable { + f := func(iterator IBioSequenceBatch) IBioSequenceBatch { + return iterator.MakeISliceWorker(worker,sizes...) + } + + return f +} diff --git a/pkg/obingslibrary/worker.go b/pkg/obingslibrary/worker.go index aa1ed89..dd6ecf0 100644 --- a/pkg/obingslibrary/worker.go +++ b/pkg/obingslibrary/worker.go @@ -1,6 +1,9 @@ package obingslibrary -import "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +import ( + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) type _Options struct { discardErrors bool @@ -167,7 +170,7 @@ func ExtractBarcodeSlice(ngslibrary NGSLibrary, } func ExtractBarcodeSliceWorker(ngslibrary NGSLibrary, - options ...WithOption) obiseq.SeqSliceWorker { + options ...WithOption) obiiter.SeqSliceWorker { opt := MakeOptions(options) @@ -179,3 +182,5 @@ func ExtractBarcodeSliceWorker(ngslibrary NGSLibrary, return worker } + + diff --git a/pkg/obiseq/batchiterator.go b/pkg/obiseq/batchiterator.go.old similarity index 100% rename from pkg/obiseq/batchiterator.go rename to pkg/obiseq/batchiterator.go.old diff --git a/pkg/obiseq/merge.go b/pkg/obiseq/merge.go index ac1500b..d73e3f6 100644 --- a/pkg/obiseq/merge.go +++ b/pkg/obiseq/merge.go @@ -165,41 +165,3 @@ func (sequences BioSequenceSlice) Merge(na string, statsOn []string) *BioSequenc return seq } - -func (iterator IBioSequenceBatch) IMergeSequenceBatch(na string, statsOn []string, sizes ...int) IBioSequenceBatch { - batchsize := 100 - buffsize := iterator.BufferSize() - - if len(sizes) > 0 { - batchsize = sizes[0] - } - if len(sizes) > 1 { - buffsize = sizes[1] - } - - newIter := MakeIBioSequenceBatch(buffsize) - - newIter.Add(1) - - go func() { - newIter.WaitAndClose() - }() - - go func() { - for j := 0; !iterator.Finished(); j++ { - batch := BioSequenceBatch{ - slice: MakeBioSequenceSlice(), - order: j} - for i := 0; i < batchsize && iterator.Next(); i++ { - seqs := iterator.Get() - batch.slice = append(batch.slice, seqs.slice.Merge(na, statsOn)) - } - if batch.Length() > 0 { - newIter.Push(batch) - } - } - newIter.Done() - }() - - return newIter -} diff --git a/pkg/obiseq/pairedbatchiterator.go b/pkg/obiseq/pairedbatchiterator.go.old similarity index 100% rename from pkg/obiseq/pairedbatchiterator.go rename to pkg/obiseq/pairedbatchiterator.go.old diff --git a/pkg/obiseq/predicate.go b/pkg/obiseq/predicate.go index 786593f..324cc07 100644 --- a/pkg/obiseq/predicate.go +++ b/pkg/obiseq/predicate.go @@ -1,5 +1,12 @@ package obiseq +import ( + "context" + "log" + + "github.com/PaesslerAG/gval" +) + type SequencePredicate func(*BioSequence) bool func (predicate1 SequencePredicate) And(predicate2 SequencePredicate) SequencePredicate { @@ -73,3 +80,33 @@ func IsShorterOrEqualTo(length int) SequencePredicate { return f } + +func ExrpessionPredicat(expression string) SequencePredicate { + + exp, err := gval.Full().NewEvaluable(expression) + + if err != nil { + log.Fatalf("Error in the expression : %s", expression) + } + + f := func(sequence *BioSequence) bool { + value, err := exp.EvalBool(context.Background(), + map[string]interface{}{ + "annot": sequence.Annotations(), + "count": sequence.Count(), + "length": sequence.Length(), + "sequence": sequence, + }, + ) + + if err != nil { + log.Fatalf("Expression '%s' cannot be evaluated on sequence %s", + expression, + sequence.Id()) + } + + return value + } + + return f +} diff --git a/pkg/obiseq/worker.go b/pkg/obiseq/worker.go new file mode 100644 index 0000000..cff335f --- /dev/null +++ b/pkg/obiseq/worker.go @@ -0,0 +1,2 @@ +package obiseq + diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index b8b47c6..152efb2 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -7,8 +7,8 @@ import ( "strings" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) func _ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { @@ -66,9 +66,9 @@ func _ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { return list_of_files, nil } -func ReadBioSequencesBatch(filenames ...string) (obiseq.IBioSequenceBatch, error) { - var iterator obiseq.IBioSequenceBatch - var reader func(string, ...obiformats.WithOption) (obiseq.IBioSequenceBatch, error) +func ReadBioSequencesBatch(filenames ...string) (obiiter.IBioSequenceBatch, error) { + var iterator obiiter.IBioSequenceBatch + var reader func(string, ...obiformats.WithOption) (obiiter.IBioSequenceBatch, error) opts := make([]obiformats.WithOption, 0, 10) @@ -106,7 +106,7 @@ func ReadBioSequencesBatch(filenames ...string) (obiseq.IBioSequenceBatch, error list_of_files, err := _ExpandListOfFiles(false, filenames...) if err != nil { - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } switch InputFormat() { @@ -121,16 +121,16 @@ func ReadBioSequencesBatch(filenames ...string) (obiseq.IBioSequenceBatch, error iterator, err = reader(list_of_files[0], opts...) if err != nil { - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } list_of_files = list_of_files[1:] - others := make([]obiseq.IBioSequenceBatch, 0, len(list_of_files)) + others := make([]obiiter.IBioSequenceBatch, 0, len(list_of_files)) for _, fn := range list_of_files { r, err := reader(fn, opts...) if err != nil { - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } others = append(others, r) } @@ -152,7 +152,7 @@ func ReadBioSequencesBatch(filenames ...string) (obiseq.IBioSequenceBatch, error return iterator, nil } -func ReadBioSequences(filenames ...string) (obiseq.IBioSequence, error) { +func ReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { ib, err := ReadBioSequencesBatch(filenames...) return ib.SortBatches().IBioSequence(), err diff --git a/pkg/obitools/obiconvert/sequence_writer.go b/pkg/obitools/obiconvert/sequence_writer.go index f32c4eb..2b01900 100644 --- a/pkg/obitools/obiconvert/sequence_writer.go +++ b/pkg/obitools/obiconvert/sequence_writer.go @@ -4,11 +4,11 @@ import ( "log" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) -func WriteBioSequences(iterator obiseq.IBioSequence, filenames ...string) error { +func WriteBioSequences(iterator obiiter.IBioSequence, filenames ...string) error { opts := make([]obiformats.WithOption, 0, 10) @@ -65,10 +65,10 @@ func WriteBioSequences(iterator obiseq.IBioSequence, filenames ...string) error return nil } -func WriteBioSequencesBatch(iterator obiseq.IBioSequenceBatch, - terminalAction bool, filenames ...string) (obiseq.IBioSequenceBatch, error) { +func WriteBioSequencesBatch(iterator obiiter.IBioSequenceBatch, + terminalAction bool, filenames ...string) (obiiter.IBioSequenceBatch, error) { - var newIter obiseq.IBioSequenceBatch + var newIter obiiter.IBioSequenceBatch opts := make([]obiformats.WithOption, 0, 10) @@ -119,12 +119,12 @@ func WriteBioSequencesBatch(iterator obiseq.IBioSequenceBatch, if err != nil { log.Fatalf("Write file error: %v", err) - return obiseq.NilIBioSequenceBatch, err + return obiiter.NilIBioSequenceBatch, err } if terminalAction { newIter.Recycle() - return obiseq.NilIBioSequenceBatch, nil + return obiiter.NilIBioSequenceBatch, nil } return newIter, nil diff --git a/pkg/obitools/obidistribute/distribute.go b/pkg/obitools/obidistribute/distribute.go index 731f8a0..eda5d34 100644 --- a/pkg/obitools/obidistribute/distribute.go +++ b/pkg/obitools/obidistribute/distribute.go @@ -4,12 +4,12 @@ import ( "log" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" ) -func DistributeSequence(sequences obiseq.IBioSequenceBatch) { +func DistributeSequence(sequences obiiter.IBioSequenceBatch) { opts := make([]obiformats.WithOption, 0, 10) diff --git a/pkg/obitools/obimultiplex/demultiplex.go b/pkg/obitools/obimultiplex/demultiplex.go index 55831f4..99f8f8d 100644 --- a/pkg/obitools/obimultiplex/demultiplex.go +++ b/pkg/obitools/obimultiplex/demultiplex.go @@ -3,13 +3,14 @@ package obimultiplex import ( "log" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obingslibrary" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" ) -func IExtractBarcodeBatches(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSequenceBatch, error) { +func IExtractBarcodeBatches(iterator obiiter.IBioSequenceBatch) (obiiter.IBioSequenceBatch, error) { opts := make([]obingslibrary.WithOption, 0, 10) @@ -36,7 +37,7 @@ func IExtractBarcodeBatches(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSeque newIter = newIter.Rebatch(obioptions.CLIBatchSize()) } - var unidentified obiseq.IBioSequenceBatch + var unidentified obiiter.IBioSequenceBatch if CLIUnidentifiedFileName() != "" { log.Printf("Unassigned sequences saved in file: %s\n", CLIUnidentifiedFileName()) unidentified, newIter = newIter.DivideOn(obiseq.HasAttribute("demultiplex_error"), diff --git a/pkg/obitools/obipairing/options.go b/pkg/obitools/obipairing/options.go index 3c4e28a..7a874ed 100644 --- a/pkg/obitools/obipairing/options.go +++ b/pkg/obitools/obipairing/options.go @@ -1,7 +1,7 @@ package obipairing import ( - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "github.com/DavidGamba/go-getoptions" ) @@ -47,15 +47,15 @@ func OptionSet(options *getoptions.GetOpt) { PairingOptionSet(options) } -func IBatchPairedSequence() (obiseq.IPairedBioSequenceBatch, error) { +func IBatchPairedSequence() (obiiter.IPairedBioSequenceBatch, error) { forward, err := obiconvert.ReadBioSequencesBatch(_ForwardFiles...) if err != nil { - return obiseq.NilIPairedBioSequenceBatch, err + return obiiter.NilIPairedBioSequenceBatch, err } reverse, err := obiconvert.ReadBioSequencesBatch(_ReverseFiles...) if err != nil { - return obiseq.NilIPairedBioSequenceBatch, err + return obiiter.NilIPairedBioSequenceBatch, err } paired := forward.PairWith(reverse) diff --git a/pkg/obitools/obipairing/pairing.go b/pkg/obitools/obipairing/pairing.go index d5ff1ce..986f203 100644 --- a/pkg/obitools/obipairing/pairing.go +++ b/pkg/obitools/obipairing/pairing.go @@ -7,6 +7,7 @@ import ( "runtime" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obialign" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "github.com/schollz/progressbar/v3" ) @@ -202,8 +203,10 @@ func AssemblePESequences(seqA, seqB *obiseq.BioSequence, // The function returns an iterator over batches of obiseq.Biosequence object. // each pair of processed sequences produces one sequence in the result iterator. // -func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch, - gap float64, delta, minOverlap int, minIdentity float64, withStats bool, sizes ...int) obiseq.IBioSequenceBatch { +func IAssemblePESequencesBatch(iterator obiiter.IPairedBioSequenceBatch, + gap float64, delta, minOverlap int, + minIdentity float64, + withStats bool, sizes ...int) obiiter.IBioSequenceBatch { nworkers := runtime.NumCPU() * 3 / 2 buffsize := iterator.BufferSize() @@ -216,7 +219,7 @@ func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch, buffsize = sizes[1] } - newIter := obiseq.MakeIBioSequenceBatch(buffsize) + newIter := obiiter.MakeIBioSequenceBatch(buffsize) newIter.Add(nworkers) @@ -233,7 +236,7 @@ func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch, progressbar.OptionShowIts(), progressbar.OptionSetDescription("[Sequence Pairing]")) - f := func(iterator obiseq.IPairedBioSequenceBatch, wid int) { + f := func(iterator obiiter.IPairedBioSequenceBatch, wid int) { arena := obialign.MakePEAlignArena(150, 150) for iterator.Next() { @@ -249,7 +252,7 @@ func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch, } } bar.Add(batch.Length() - processed) - newIter.Push(obiseq.MakeBioSequenceBatch( + newIter.Push(obiiter.MakeBioSequenceBatch( batch.Order(), cons, )) diff --git a/pkg/obitools/obipcr/pcr.go b/pkg/obitools/obipcr/pcr.go index 2dd6b41..7515dfa 100644 --- a/pkg/obitools/obipcr/pcr.go +++ b/pkg/obitools/obipcr/pcr.go @@ -2,13 +2,13 @@ package obipcr import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiapat" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" ) // PCR iterates over sequences provided by a obiseq.IBioSequenceBatch // and returns an other obiseq.IBioSequenceBatch distributing // obiseq.BioSequenceBatch containing the selected amplicon sequences. -func PCR(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSequenceBatch, error) { +func PCR(iterator obiiter.IBioSequenceBatch) (obiiter.IBioSequenceBatch, error) { opts := make([]obiapat.WithOption, 0, 10) diff --git a/pkg/obitools/obiuniq/unique.go b/pkg/obitools/obiuniq/unique.go index 20aa0cb..f468103 100644 --- a/pkg/obitools/obiuniq/unique.go +++ b/pkg/obitools/obiuniq/unique.go @@ -4,11 +4,11 @@ import ( "log" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obichunk" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) -func Unique(sequences obiseq.IBioSequenceBatch) obiseq.IBioSequenceBatch { +func Unique(sequences obiiter.IBioSequenceBatch) obiiter.IBioSequenceBatch { options := make([]obichunk.WithOption, 0, 30)