From 526bf79c7f7b8e6acf068fd2f4fe3b901cc604bc Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 8 Feb 2023 13:14:26 +0100 Subject: [PATCH] Patch for some lost of data during sequence writing --- cmd/obitools/obiannotate/main.go | 4 +++ cmd/obitools/obiclean/main.go | 4 +++ cmd/obitools/obicomplement/main.go | 4 +++ cmd/obitools/obiconvert/main.go | 4 +++ cmd/obitools/obidistribute/main.go | 4 +++ cmd/obitools/obigrep/main.go | 3 +++ cmd/obitools/obimultiplex/main.go | 3 +++ cmd/obitools/obipairing/main.go | 3 +++ cmd/obitools/obipcr/main.go | 3 +++ cmd/obitools/obirefidx/main.go | 6 +++-- cmd/obitools/obitag/main.go | 2 ++ cmd/obitools/obiuniq/main.go | 4 +++ pkg/obiformats/fastseq_write_fasta.go | 6 +++++ pkg/obiformats/fastseq_write_fastq.go | 3 ++- pkg/obiiter/batchiterator.go | 38 ++++++++++++++++++++++----- pkg/obiiter/pairedbatchiterator.go | 18 +++++++++++++ 16 files changed, 99 insertions(+), 10 deletions(-) diff --git a/cmd/obitools/obiannotate/main.go b/cmd/obitools/obiannotate/main.go index 84dbcc6..f652972 100644 --- a/cmd/obitools/obiannotate/main.go +++ b/cmd/obitools/obiannotate/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiannotate" @@ -36,4 +37,7 @@ func main() { sequences, _ := obiconvert.ReadBioSequences(args...) annotator := obiannotate.CLIAnnotationPipeline() obiconvert.WriteBioSequences(sequences.Pipe(annotator), true) + + obiiter.WaitForLastPipe() + } diff --git a/cmd/obitools/obiclean/main.go b/cmd/obitools/obiclean/main.go index 4788b3a..2cbce97 100644 --- a/cmd/obitools/obiclean/main.go +++ b/cmd/obitools/obiclean/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiclean" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" @@ -19,4 +20,7 @@ func main() { cleaned := obiclean.IOBIClean(fs) obiconvert.WriteBioSequences(cleaned, true) + + obiiter.WaitForLastPipe() + } diff --git a/cmd/obitools/obicomplement/main.go b/cmd/obitools/obicomplement/main.go index 3036599..2204ca9 100644 --- a/cmd/obitools/obicomplement/main.go +++ b/cmd/obitools/obicomplement/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" @@ -18,4 +19,7 @@ func main() { comp := fs.MakeIWorker(obiseq.ReverseComplementWorker(true)) obiconvert.WriteBioSequences(comp, true) + + obiiter.WaitForLastPipe() + } diff --git a/cmd/obitools/obiconvert/main.go b/cmd/obitools/obiconvert/main.go index e88a62f..bfb9f60 100644 --- a/cmd/obitools/obiconvert/main.go +++ b/cmd/obitools/obiconvert/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" @@ -15,4 +16,7 @@ func main() { fs, _ := obiconvert.ReadBioSequences(args...) obiconvert.WriteBioSequences(fs, true) + + obiiter.WaitForLastPipe() + } diff --git a/cmd/obitools/obidistribute/main.go b/cmd/obitools/obidistribute/main.go index 1c636a7..29c6ef8 100644 --- a/cmd/obitools/obidistribute/main.go +++ b/cmd/obitools/obidistribute/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obidistribute" @@ -16,4 +17,7 @@ func main() { fs, _ := obiconvert.ReadBioSequences(args...) obidistribute.DistributeSequence(fs) + + obiiter.WaitForLastPipe() + } diff --git a/cmd/obitools/obigrep/main.go b/cmd/obitools/obigrep/main.go index eea5cde..e17e12b 100644 --- a/cmd/obitools/obigrep/main.go +++ b/cmd/obitools/obigrep/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" @@ -36,4 +37,6 @@ func main() { sequences, _ := obiconvert.ReadBioSequences(args...) selected := obigrep.IFilterSequence(sequences) obiconvert.WriteBioSequences(selected, true) + obiiter.WaitForLastPipe() + } diff --git a/cmd/obitools/obimultiplex/main.go b/cmd/obitools/obimultiplex/main.go index ccf44d8..638c597 100644 --- a/cmd/obitools/obimultiplex/main.go +++ b/cmd/obitools/obimultiplex/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obimultiplex" @@ -32,4 +33,6 @@ func main() { amplicons, _ := obimultiplex.IExtractBarcode(sequences) obiconvert.WriteBioSequences(amplicons, true) amplicons.Wait() + obiiter.WaitForLastPipe() + } diff --git a/cmd/obitools/obipairing/main.go b/cmd/obitools/obipairing/main.go index 90ade38..731c033 100644 --- a/cmd/obitools/obipairing/main.go +++ b/cmd/obitools/obipairing/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obipairing" @@ -39,4 +40,6 @@ func main() { obioptions.CLIParallelWorkers(), ) obiconvert.WriteBioSequences(paired, true) + + obiiter.WaitForLastPipe() } diff --git a/cmd/obitools/obipcr/main.go b/cmd/obitools/obipcr/main.go index c37c8c4..2e7df29 100644 --- a/cmd/obitools/obipcr/main.go +++ b/cmd/obitools/obipcr/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obipcr" @@ -33,4 +34,6 @@ func main() { sequences, _ := obiconvert.ReadBioSequences(args...) amplicons, _ := obipcr.PCR(sequences) obiconvert.WriteBioSequences(amplicons, true) + obiiter.WaitForLastPipe() + } diff --git a/cmd/obitools/obirefidx/main.go b/cmd/obitools/obirefidx/main.go index 12e6db3..f469d75 100644 --- a/cmd/obitools/obirefidx/main.go +++ b/cmd/obitools/obirefidx/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obirefidx" @@ -17,6 +18,7 @@ func main() { fs, _ := obiconvert.ReadBioSequences(args...) indexed := obirefidx.IndexReferenceDB(fs) - written, _ := obiconvert.WriteBioSequences(indexed, false) - written.Consume() + obiconvert.WriteBioSequences(indexed, true) + obiiter.WaitForLastPipe() + } diff --git a/cmd/obitools/obitag/main.go b/cmd/obitools/obitag/main.go index 21efffe..f0bc2f8 100644 --- a/cmd/obitools/obitag/main.go +++ b/cmd/obitools/obitag/main.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obitag" @@ -36,6 +37,7 @@ func main() { identified := obitag.AssignTaxonomy(fs) obiconvert.WriteBioSequences(identified, true) + obiiter.WaitForLastPipe() fmt.Println("") } diff --git a/cmd/obitools/obiuniq/main.go b/cmd/obitools/obiuniq/main.go index 3ad4360..ccc0440 100644 --- a/cmd/obitools/obiuniq/main.go +++ b/cmd/obitools/obiuniq/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" @@ -36,4 +37,7 @@ func main() { sequences, _ := obiconvert.ReadBioSequences(args...) unique := obiuniq.Unique(sequences) obiconvert.WriteBioSequences(unique, true) + + obiiter.WaitForLastPipe() + } diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index 34e6ad2..404c03a 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -6,6 +6,7 @@ import ( "io" "os" "strings" + "time" log "github.com/sirupsen/logrus" @@ -71,6 +72,7 @@ func WriteFasta(iterator obiiter.IBioSequence, nwriters := opt.ParallelWorkers() + obiiter.RegisterAPipe() chunkchan := make(chan FileChunck) header_format := opt.FormatFastSeqHeader() @@ -79,7 +81,11 @@ func WriteFasta(iterator obiiter.IBioSequence, go func() { newIter.WaitAndClose() + for len(chunkchan) > 0 { + time.Sleep(time.Millisecond) + } close(chunkchan) + obiiter.UnregisterPipe() log.Debugln("End of the fasta file writing") }() diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 99f7749..9f37aff 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -61,6 +61,7 @@ func WriteFastq(iterator obiiter.IBioSequence, nwriters := opt.ParallelWorkers() + obiiter.RegisterAPipe() chunkchan := make(chan FileChunck) header_format := opt.FormatFastSeqHeader() @@ -74,6 +75,7 @@ func WriteFastq(iterator obiiter.IBioSequence, time.Sleep(time.Millisecond) } close(chunkchan) + obiiter.UnregisterPipe() log.Debugln("End of the fastq file writing") }() @@ -123,7 +125,6 @@ func WriteFastq(iterator obiiter.IBioSequence, file.Close() } } - }() return newIter, nil diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index 5ca2967..573e05a 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -15,6 +15,25 @@ import ( "github.com/tevino/abool/v2" ) +var globalLocker sync.WaitGroup +var globalLockerCounter = 0 + +func RegisterAPipe() { + globalLocker.Add(1) + globalLockerCounter++ + log.Debugln(globalLockerCounter, " Pipes are registered now") +} + +func UnregisterPipe() { + globalLocker.Done() + globalLockerCounter-- + log.Debugln(globalLockerCounter, "are still registered") +} + +func WaitForLastPipe() { + globalLocker.Wait() +} + // Structure implementing an iterator over bioseq.BioSequenceBatch // based on a channel. type _IBioSequence struct { @@ -61,6 +80,9 @@ func MakeIBioSequence(sizes ...int) IBioSequence { lock := sync.RWMutex{} i.lock = &lock ii := IBioSequence{&i} + + RegisterAPipe() + return ii } @@ -229,6 +251,7 @@ func (iterator IBioSequence) Push(batch BioSequenceBatch) { func (iterator IBioSequence) Close() { close(iterator.pointer.channel) + UnregisterPipe() } func (iterator IBioSequence) WaitAndClose() { @@ -237,6 +260,7 @@ func (iterator IBioSequence) WaitAndClose() { for len(iterator.Channel()) > 0 { time.Sleep(time.Millisecond) } + iterator.Close() } @@ -258,20 +282,21 @@ func (iterator IBioSequence) SortBatches(sizes ...int) IBioSequence { newIter.Add(1) go func() { - newIter.Wait() - close(newIter.pointer.channel) + newIter.WaitAndClose() }() next_to_send := 0 + //log.Println("wait for batch #", next_to_send) received := make(map[int]BioSequenceBatch) go func() { for iterator.Next() { batch := iterator.Get() - // log.Println("Pushd seq #", batch.order, next_to_send) + // log.Println("\nPushd seq #\n", batch.order, next_to_send) if batch.order == next_to_send { newIter.pointer.channel <- batch next_to_send++ + //log.Println("\nwait for batch #\n", next_to_send) batch, ok := received[next_to_send] for ok { newIter.pointer.channel <- batch @@ -386,8 +411,7 @@ func (iterator IBioSequence) Rebatch(size int, sizes ...int) IBioSequence { newIter.Add(1) go func() { - newIter.Wait() - close(newIter.pointer.channel) + newIter.WaitAndClose() }() go func() { @@ -427,6 +451,7 @@ func (iterator IBioSequence) Recycle() { for iterator.Next() { // iterator.Get() batch := iterator.Get() + log.Debugln("Recycling batch #", batch.Order()) for _, seq := range batch.Slice() { seq.Recycle() recycled++ @@ -488,8 +513,7 @@ func (iterator IBioSequence) PairWith(reverse IBioSequence, newIter.Add(1) go func() { - newIter.Wait() - close(newIter.Channel()) + newIter.WaitAndClose() log.Println("End of association of paired reads") }() diff --git a/pkg/obiiter/pairedbatchiterator.go b/pkg/obiiter/pairedbatchiterator.go index 0b851bc..004c0a5 100644 --- a/pkg/obiiter/pairedbatchiterator.go +++ b/pkg/obiiter/pairedbatchiterator.go @@ -2,6 +2,7 @@ package obiiter import ( "sync" + "time" log "github.com/sirupsen/logrus" @@ -96,6 +97,8 @@ func MakeIPairedBioSequenceBatch(sizes ...int) IPairedBioSequenceBatch { waiting := sync.WaitGroup{} i.all_done = &waiting ii := IPairedBioSequenceBatch{&i} + + RegisterAPipe() return ii } @@ -115,6 +118,21 @@ func (iterator IPairedBioSequenceBatch) Channel() chan PairedBioSequenceBatch { return iterator.pointer.channel } +func (iterator IPairedBioSequenceBatch) Close() { + close(iterator.pointer.channel) + UnregisterPipe() +} + +func (iterator IPairedBioSequenceBatch) WaitAndClose() { + iterator.Wait() + + for len(iterator.Channel()) > 0 { + time.Sleep(time.Millisecond) + } + + iterator.Close() +} + func (iterator IPairedBioSequenceBatch) IsNil() bool { return iterator.pointer == nil }