Debug fasta and fastq writer when the first sequence is hudge

Former-commit-id: d208ff838abb7e19e117067f6243298492d60f14
This commit is contained in:
Eric Coissac
2024-06-26 18:39:42 +02:00
parent 1835cb2cf3
commit e40d0bfbe7
7 changed files with 78 additions and 17 deletions

View File

@ -80,15 +80,24 @@ func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, ski
// Create a buffer to store the formatted sequences
var bs bytes.Buffer
lt := 0
for _, seq := range batch.Slice() {
lt += seq.Len()
}
// Iterate over each sequence in the batch
for i, seq := range batch.Slice() {
log.Debugf("FormatFastaBatch: #%d : %d seqs", batch.Order(), batch.Len())
first := true
for _, seq := range batch.Slice() {
// Check if the sequence is empty
if seq.Len() > 0 {
// Format the sequence using the provided formater function
formattedSeq := FormatFasta(seq, formater)
if i == 0 {
bs.Grow(len(formattedSeq) * len(batch.Slice()) * 5 / 4)
if first {
bs.Grow(lt + (len(formattedSeq)-seq.Len())*batch.Len()*5/4)
first = false
}
// Append the formatted sequence to the buffer
@ -148,10 +157,14 @@ func WriteFasta(iterator obiiter.IBioSequence,
batch := iterator.Get()
log.Debugf("Formating fasta chunk %d", batch.Order())
chunkchan <- FileChunck{
FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()),
batch.Order(),
}
log.Debugf("Fasta chunk %d formated", batch.Order())
newIter.Push(batch)
}
newIter.Done()
@ -171,15 +184,18 @@ func WriteFasta(iterator obiiter.IBioSequence,
for chunk := range chunkchan {
if chunk.order == next_to_send {
file.Write(chunk.text)
log.Debugf("Fasta chunk %d written", chunk.order)
next_to_send++
chunk, ok := received[next_to_send]
for ok {
file.Write(chunk.text)
log.Debugf("Fasta chunk %d written", chunk.order)
delete(received, next_to_send)
next_to_send++
chunk, ok = received[next_to_send]
}
} else {
log.Debugf("Store Fasta chunk %d", chunk.order)
received[chunk.order] = chunk
}

View File

@ -52,14 +52,24 @@ func FormatFastqBatch(batch obiiter.BioSequenceBatch,
formater FormatHeader, skipEmpty bool) []byte {
var bs bytes.Buffer
for i, seq := range batch.Slice() {
lt := 0
for _, seq := range batch.Slice() {
lt += seq.Len()
}
// Iterate over each sequence in the batch
first := true
for _, seq := range batch.Slice() {
if seq.Len() > 0 {
_formatFastq(&bs, seq, formater)
if i == 0 {
bs.Grow(len(bs.Bytes()) * len(batch.Slice()) * 5 / 4)
if first {
bs.Grow(lt + (len(bs.Bytes())-seq.Len())*batch.Len()*5/4)
first = false
}
} else {
if skipEmpty {
log.Warnf("Sequence %s is empty and skiped in output", seq.Id())

View File

@ -146,7 +146,7 @@ func _ParseGenbankFile(source string,
log.Warn("Empty id when parsing genbank file")
}
log.Debugf("End of sequence %s: %dbp ", id, seqBytes.Len())
// log.Debugf("End of sequence %s: %dbp ", id, seqBytes.Len())
sequence := obiseq.NewBioSequence(id,
seqBytes.Bytes(),
@ -168,8 +168,9 @@ func _ParseGenbankFile(source string,
sumlength += sequence.Len()
if len(sequences) == batch_size || sumlength > total_seq_size {
log.Debugln("Pushing sequences")
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
oo := chunck_order()
log.Debugln("Pushing sequence batch ", oo, " with ", len(sequences), " sequences")
out.Push(obiiter.MakeBioSequenceBatch(oo, sequences))
sequences = make(obiseq.BioSequenceSlice, 0, 100)
sumlength = 0
}
@ -218,13 +219,14 @@ func _ParseGenbankFile(source string,
}
log.Debugf("End of chunk %d : %s", chunks.order, line)
if len(sequences) > 0 {
log.Debugln("Pushing sequences")
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
oo := chunck_order()
log.Debugln("Pushing sequence batch ", oo, " with ", len(sequences), " sequences")
out.Push(obiiter.MakeBioSequenceBatch(oo, sequences))
}
}
log.Debug("End of the Genbank thread")
out.Done()
}
@ -255,6 +257,7 @@ func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
go func() {
newIter.WaitAndClose()
log.Debug("End of the genbank file ", opt.Source())
}()
if opt.FullFileBatch() {

View File

@ -437,6 +437,7 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence {
buffer = append(buffer, seqs.Slice()[i:(i+to_push)]...)
if len(buffer) == size {
newIter.Push(MakeBioSequenceBatch(order, buffer))
log.Debugf("Rebatch #%d pushd", order)
order++
buffer = obiseq.MakeBioSequenceSlice()
}
@ -444,9 +445,10 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence {
}
seqs.Recycle(false)
}
log.Debug("End of the rebatch loop")
if len(buffer) > 0 {
newIter.Push(MakeBioSequenceBatch(order, buffer))
log.Debugf("Final Rebatch #%d pushd", order)
}
newIter.Done()
@ -467,9 +469,11 @@ func (iterator IBioSequence) Recycle() {
for iterator.Next() {
// iterator.Get()
batch := iterator.Get()
log.Debugln("Recycling batch #", batch.Order())
o := batch.Order()
log.Debugln("Recycling batch #", o)
recycled += batch.Len()
batch.Recycle(true)
log.Debugln("Batch #", o, " recycled")
}
log.Debugf("End of the recycling of %d Bioseq objects", recycled)
}

View File

@ -23,6 +23,8 @@ var _ParallelFilesRead = 0
var _MaxAllowedCPU = runtime.NumCPU()
var _BatchSize = 2000
var _Pprof = false
var _PprofMudex = 10
var _PprofGoroutine = 6060
var _Quality_Shift_Input = byte(33)
var _Quality_Shift_Output = byte(33)
@ -56,6 +58,14 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser
options.BoolVar(&_Pprof, "force-one-cpu", false,
options.Description("Force to use only one cpu core for parallel processing"))
options.IntVar(&_PprofMudex, "pprof-mutex", _PprofMudex,
options.GetEnv("OBIPPROFMUTEX"),
options.Description("Enable profiling of mutex lock."))
options.IntVar(&_PprofGoroutine, "pprof-goroutine", _PprofGoroutine,
options.GetEnv("OBIPPROFGOROUTINE"),
options.Description("Enable profiling of goroutine blocking profile."))
options.IntVar(&_BatchSize, "batch-size", _BatchSize,
options.GetEnv("OBIBATCHSIZE"),
options.Description("Number of sequence per batch for paralelle processing"))
@ -96,6 +106,24 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser
log.Info(" go tool pprof -http=127.0.0.1:8080 'http://localhost:6060/debug/pprof/profile?seconds=30'")
}
if options.Called("pprof-mutex") {
url := "localhost:6060"
go http.ListenAndServe(url, nil)
runtime.SetMutexProfileFraction(_PprofMudex)
log.Infof("Start a pprof server at address %s/debug/pprof", url)
log.Info("Profil can be followed running concurrently the command :")
log.Info(" go tool pprof -http=127.0.0.1:8080 'http://localhost:6060/debug/pprof/mutex'")
}
if options.Called("pprof-goroutine") {
url := "localhost:6060"
go http.ListenAndServe(url, nil)
runtime.SetBlockProfileRate(_PprofGoroutine)
log.Infof("Start a pprof server at address %s/debug/pprof", url)
log.Info("Profil can be followed running concurrently the command :")
log.Info(" go tool pprof -http=127.0.0.1:8080 'http://localhost:6060/debug/pprof/block'")
}
// Handle user errors
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %s\n\n", err)

View File

@ -7,7 +7,7 @@ import (
// TODO: The version number is extracted from git. This induces that the version
// corresponds to the last commit, and not the one when the file will be
// commited
var _Commit = "ab9b472"
var _Commit = "5073957"
var _Version = "Release 4.2.0"
// Version returns the version of the obitools package.

View File

@ -378,7 +378,7 @@ func CLIOBIMinion(itertator obiiter.IBioSequence) obiiter.IBioSequence {
}()
obiuniq.AddStatsOn(CLISampleAttribute())
obiuniq.AddStatsOn("sample:obiconsensus_weight")
// obiuniq.AddStatsOn("sample:obiconsensus_weight")
obiuniq.SetUniqueInMemory(false)
obiuniq.SetNoSingleton(CLINoSingleton())
return obiuniq.CLIUnique(newIter).Pipe(obiiter.WorkerPipe(obiannotate.AddSeqLengthWorker(), false))