From e40d0bfbe7aa6f68dfdf47378c71593d161cccdf Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 26 Jun 2024 18:39:42 +0200 Subject: [PATCH] Debug fasta and fastq writer when the first sequence is hudge Former-commit-id: d208ff838abb7e19e117067f6243298492d60f14 --- pkg/obiformats/fastseq_write_fasta.go | 22 +++++++++++++++--- pkg/obiformats/fastseq_write_fastq.go | 18 +++++++++++---- pkg/obiformats/genbank_read.go | 15 +++++++----- pkg/obiiter/batchiterator.go | 8 +++++-- pkg/obioptions/options.go | 28 +++++++++++++++++++++++ pkg/obioptions/version.go | 2 +- pkg/obitools/obiconsensus/obiconsensus.go | 2 +- 7 files changed, 78 insertions(+), 17 deletions(-) diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index 165d7c1..fbb2be8 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -80,15 +80,24 @@ func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, ski // Create a buffer to store the formatted sequences var bs bytes.Buffer + lt := 0 + + for _, seq := range batch.Slice() { + lt += seq.Len() + } + // Iterate over each sequence in the batch - for i, seq := range batch.Slice() { + log.Debugf("FormatFastaBatch: #%d : %d seqs", batch.Order(), batch.Len()) + first := true + for _, seq := range batch.Slice() { // Check if the sequence is empty if seq.Len() > 0 { // Format the sequence using the provided formater function formattedSeq := FormatFasta(seq, formater) - if i == 0 { - bs.Grow(len(formattedSeq) * len(batch.Slice()) * 5 / 4) + if first { + bs.Grow(lt + (len(formattedSeq)-seq.Len())*batch.Len()*5/4) + first = false } // Append the formatted sequence to the buffer @@ -148,10 +157,14 @@ func WriteFasta(iterator obiiter.IBioSequence, batch := iterator.Get() + log.Debugf("Formating fasta chunk %d", batch.Order()) + chunkchan <- FileChunck{ FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()), batch.Order(), } + log.Debugf("Fasta chunk %d formated", batch.Order()) + newIter.Push(batch) } newIter.Done() @@ -171,15 +184,18 @@ func WriteFasta(iterator obiiter.IBioSequence, for chunk := range chunkchan { if chunk.order == next_to_send { file.Write(chunk.text) + log.Debugf("Fasta chunk %d written", chunk.order) next_to_send++ chunk, ok := received[next_to_send] for ok { file.Write(chunk.text) + log.Debugf("Fasta chunk %d written", chunk.order) delete(received, next_to_send) next_to_send++ chunk, ok = received[next_to_send] } } else { + log.Debugf("Store Fasta chunk %d", chunk.order) received[chunk.order] = chunk } diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 9d8842f..93c5e31 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -52,14 +52,24 @@ func FormatFastqBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, skipEmpty bool) []byte { var bs bytes.Buffer - for i, seq := range batch.Slice() { + lt := 0 + + for _, seq := range batch.Slice() { + lt += seq.Len() + } + + // Iterate over each sequence in the batch + first := true + + for _, seq := range batch.Slice() { if seq.Len() > 0 { _formatFastq(&bs, seq, formater) - if i == 0 { - - bs.Grow(len(bs.Bytes()) * len(batch.Slice()) * 5 / 4) + if first { + bs.Grow(lt + (len(bs.Bytes())-seq.Len())*batch.Len()*5/4) + first = false } + } else { if skipEmpty { log.Warnf("Sequence %s is empty and skiped in output", seq.Id()) diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index 3017995..0940d61 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -146,7 +146,7 @@ func _ParseGenbankFile(source string, log.Warn("Empty id when parsing genbank file") } - log.Debugf("End of sequence %s: %dbp ", id, seqBytes.Len()) + // log.Debugf("End of sequence %s: %dbp ", id, seqBytes.Len()) sequence := obiseq.NewBioSequence(id, seqBytes.Bytes(), @@ -168,8 +168,9 @@ func _ParseGenbankFile(source string, sumlength += sequence.Len() if len(sequences) == batch_size || sumlength > total_seq_size { - log.Debugln("Pushing sequences") - out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) + oo := chunck_order() + log.Debugln("Pushing sequence batch ", oo, " with ", len(sequences), " sequences") + out.Push(obiiter.MakeBioSequenceBatch(oo, sequences)) sequences = make(obiseq.BioSequenceSlice, 0, 100) sumlength = 0 } @@ -218,13 +219,14 @@ func _ParseGenbankFile(source string, } - log.Debugf("End of chunk %d : %s", chunks.order, line) if len(sequences) > 0 { - log.Debugln("Pushing sequences") - out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) + oo := chunck_order() + log.Debugln("Pushing sequence batch ", oo, " with ", len(sequences), " sequences") + out.Push(obiiter.MakeBioSequenceBatch(oo, sequences)) } } + log.Debug("End of the Genbank thread") out.Done() } @@ -255,6 +257,7 @@ func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence { go func() { newIter.WaitAndClose() + log.Debug("End of the genbank file ", opt.Source()) }() if opt.FullFileBatch() { diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index 401ffce..b82ee2c 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -437,6 +437,7 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { buffer = append(buffer, seqs.Slice()[i:(i+to_push)]...) if len(buffer) == size { newIter.Push(MakeBioSequenceBatch(order, buffer)) + log.Debugf("Rebatch #%d pushd", order) order++ buffer = obiseq.MakeBioSequenceSlice() } @@ -444,9 +445,10 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { } seqs.Recycle(false) } - + log.Debug("End of the rebatch loop") if len(buffer) > 0 { newIter.Push(MakeBioSequenceBatch(order, buffer)) + log.Debugf("Final Rebatch #%d pushd", order) } newIter.Done() @@ -467,9 +469,11 @@ func (iterator IBioSequence) Recycle() { for iterator.Next() { // iterator.Get() batch := iterator.Get() - log.Debugln("Recycling batch #", batch.Order()) + o := batch.Order() + log.Debugln("Recycling batch #", o) recycled += batch.Len() batch.Recycle(true) + log.Debugln("Batch #", o, " recycled") } log.Debugf("End of the recycling of %d Bioseq objects", recycled) } diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index a5cdc14..773dcf3 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -23,6 +23,8 @@ var _ParallelFilesRead = 0 var _MaxAllowedCPU = runtime.NumCPU() var _BatchSize = 2000 var _Pprof = false +var _PprofMudex = 10 +var _PprofGoroutine = 6060 var _Quality_Shift_Input = byte(33) var _Quality_Shift_Output = byte(33) @@ -56,6 +58,14 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser options.BoolVar(&_Pprof, "force-one-cpu", false, options.Description("Force to use only one cpu core for parallel processing")) + options.IntVar(&_PprofMudex, "pprof-mutex", _PprofMudex, + options.GetEnv("OBIPPROFMUTEX"), + options.Description("Enable profiling of mutex lock.")) + + options.IntVar(&_PprofGoroutine, "pprof-goroutine", _PprofGoroutine, + options.GetEnv("OBIPPROFGOROUTINE"), + options.Description("Enable profiling of goroutine blocking profile.")) + options.IntVar(&_BatchSize, "batch-size", _BatchSize, options.GetEnv("OBIBATCHSIZE"), options.Description("Number of sequence per batch for paralelle processing")) @@ -96,6 +106,24 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser log.Info(" go tool pprof -http=127.0.0.1:8080 'http://localhost:6060/debug/pprof/profile?seconds=30'") } + if options.Called("pprof-mutex") { + url := "localhost:6060" + go http.ListenAndServe(url, nil) + runtime.SetMutexProfileFraction(_PprofMudex) + log.Infof("Start a pprof server at address %s/debug/pprof", url) + log.Info("Profil can be followed running concurrently the command :") + log.Info(" go tool pprof -http=127.0.0.1:8080 'http://localhost:6060/debug/pprof/mutex'") + } + + if options.Called("pprof-goroutine") { + url := "localhost:6060" + go http.ListenAndServe(url, nil) + runtime.SetBlockProfileRate(_PprofGoroutine) + log.Infof("Start a pprof server at address %s/debug/pprof", url) + log.Info("Profil can be followed running concurrently the command :") + log.Info(" go tool pprof -http=127.0.0.1:8080 'http://localhost:6060/debug/pprof/block'") + } + // Handle user errors if err != nil { fmt.Fprintf(os.Stderr, "ERROR: %s\n\n", err) diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index 5124a22..5417461 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -7,7 +7,7 @@ import ( // TODO: The version number is extracted from git. This induces that the version // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "ab9b472" +var _Commit = "5073957" var _Version = "Release 4.2.0" // Version returns the version of the obitools package. diff --git a/pkg/obitools/obiconsensus/obiconsensus.go b/pkg/obitools/obiconsensus/obiconsensus.go index 3ee3cad..cc1fd9c 100644 --- a/pkg/obitools/obiconsensus/obiconsensus.go +++ b/pkg/obitools/obiconsensus/obiconsensus.go @@ -378,7 +378,7 @@ func CLIOBIMinion(itertator obiiter.IBioSequence) obiiter.IBioSequence { }() obiuniq.AddStatsOn(CLISampleAttribute()) - obiuniq.AddStatsOn("sample:obiconsensus_weight") + // obiuniq.AddStatsOn("sample:obiconsensus_weight") obiuniq.SetUniqueInMemory(false) obiuniq.SetNoSingleton(CLINoSingleton()) return obiuniq.CLIUnique(newIter).Pipe(obiiter.WorkerPipe(obiannotate.AddSeqLengthWorker(), false))