Last bug on sequence writing

This commit is contained in:
2023-02-08 13:56:50 +01:00
parent 526bf79c7f
commit 8b70b1a5d8
2 changed files with 12 additions and 0 deletions

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"os" "os"
"strings" "strings"
"sync"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -78,6 +79,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
header_format := opt.FormatFastSeqHeader() header_format := opt.FormatFastSeqHeader()
newIter.Add(nwriters) newIter.Add(nwriters)
var waitWriter sync.WaitGroup
go func() { go func() {
newIter.WaitAndClose() newIter.WaitAndClose()
@ -85,6 +87,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
close(chunkchan) close(chunkchan)
waitWriter.Wait()
obiiter.UnregisterPipe() obiiter.UnregisterPipe()
log.Debugln("End of the fasta file writing") log.Debugln("End of the fasta file writing")
}() }()
@ -112,6 +115,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
next_to_send := 0 next_to_send := 0
received := make(map[int]FileChunck, 100) received := make(map[int]FileChunck, 100)
waitWriter.Add(1)
go func() { go func() {
for chunk := range chunkchan { for chunk := range chunkchan {
if chunk.order == next_to_send { if chunk.order == next_to_send {
@ -136,6 +140,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
file.Close() file.Close()
} }
} }
waitWriter.Done()
}() }()

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"sync"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -69,12 +70,15 @@ func WriteFastq(iterator obiiter.IBioSequence,
newIter.Add(nwriters) newIter.Add(nwriters)
var waitWriter sync.WaitGroup
go func() { go func() {
newIter.WaitAndClose() newIter.WaitAndClose()
for len(chunkchan) > 0 { for len(chunkchan) > 0 {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
close(chunkchan) close(chunkchan)
waitWriter.Wait()
obiiter.UnregisterPipe() obiiter.UnregisterPipe()
log.Debugln("End of the fastq file writing") log.Debugln("End of the fastq file writing")
}() }()
@ -101,6 +105,7 @@ func WriteFastq(iterator obiiter.IBioSequence,
next_to_send := 0 next_to_send := 0
received := make(map[int]FileChunck, 100) received := make(map[int]FileChunck, 100)
waitWriter.Add(1)
go func() { go func() {
for chunk := range chunkchan { for chunk := range chunkchan {
if chunk.order == next_to_send { if chunk.order == next_to_send {
@ -125,6 +130,8 @@ func WriteFastq(iterator obiiter.IBioSequence,
file.Close() file.Close()
} }
} }
waitWriter.Done()
}() }()
return newIter, nil return newIter, nil