Factorize code between fasta and fastq writers

This commit is contained in:
Eric Coissac
2024-08-01 17:22:46 +02:00
parent 776b8f75b7
commit 6a2f867ae1
5 changed files with 65 additions and 90 deletions

View File

@ -7,8 +7,6 @@ import (
"io" "io"
"os" "os"
"strings" "strings"
"sync"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -135,21 +133,16 @@ func WriteFasta(iterator obiiter.IBioSequence,
nwriters := opt.ParallelWorkers() nwriters := opt.ParallelWorkers()
obiiter.RegisterAPipe() chunkchan := WriteSeqFileChunk(file, opt.CloseFile())
chunkchan := make(chan FileChunk)
header_format := opt.FormatFastSeqHeader() header_format := opt.FormatFastSeqHeader()
newIter.Add(nwriters) newIter.Add(nwriters)
var waitWriter sync.WaitGroup
go func() { go func() {
newIter.WaitAndClose() newIter.WaitAndClose()
for len(chunkchan) > 0 {
time.Sleep(time.Millisecond)
}
close(chunkchan) close(chunkchan)
waitWriter.Wait() log.Warnf("Writing fasta file done")
}() }()
ff := func(iterator obiiter.IBioSequence) { ff := func(iterator obiiter.IBioSequence) {
@ -159,10 +152,12 @@ func WriteFasta(iterator obiiter.IBioSequence,
log.Debugf("Formating fasta chunk %d", batch.Order()) log.Debugf("Formating fasta chunk %d", batch.Order())
chunkchan <- FileChunk{ chunkchan <- SeqFileChunk{
FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()), Source: batch.Source(),
batch.Order(), Raw: bytes.NewBuffer(FormatFastaBatch(batch, header_format, opt.SkipEmptySequence())),
Order: batch.Order(),
} }
log.Debugf("Fasta chunk %d formated", batch.Order()) log.Debugf("Fasta chunk %d formated", batch.Order())
newIter.Push(batch) newIter.Push(batch)
@ -176,39 +171,6 @@ func WriteFasta(iterator obiiter.IBioSequence,
go ff(iterator.Split()) go ff(iterator.Split())
} }
next_to_send := 0
received := make(map[int]FileChunk, 100)
waitWriter.Add(1)
go func() {
for chunk := range chunkchan {
if chunk.order == next_to_send {
file.Write(chunk.text)
log.Debugf("Fasta chunk %d written", chunk.order)
next_to_send++
chunk, ok := received[next_to_send]
for ok {
file.Write(chunk.text)
log.Debugf("Fasta chunk %d written", chunk.order)
delete(received, next_to_send)
next_to_send++
chunk, ok = received[next_to_send]
}
} else {
log.Debugf("Store Fasta chunk %d", chunk.order)
received[chunk.order] = chunk
}
}
file.Close()
log.Debugln("End of the fasta file writing")
obiiter.UnregisterPipe()
waitWriter.Done()
}()
return newIter, nil return newIter, nil
} }

View File

@ -105,8 +105,7 @@ func WriteFastq(iterator obiiter.IBioSequence,
nwriters := opt.ParallelWorkers() nwriters := opt.ParallelWorkers()
obiiter.RegisterAPipe() chunkchan := WriteSeqFileChunk(file, opt.CloseFile())
chunkchan := make(chan FileChunk)
header_format := opt.FormatFastSeqHeader() header_format := opt.FormatFastSeqHeader()
@ -126,9 +125,10 @@ func WriteFastq(iterator obiiter.IBioSequence,
ff := func(iterator obiiter.IBioSequence) { ff := func(iterator obiiter.IBioSequence) {
for iterator.Next() { for iterator.Next() {
batch := iterator.Get() batch := iterator.Get()
chunk := FileChunk{ chunk := SeqFileChunk{
FormatFastqBatch(batch, header_format, opt.SkipEmptySequence()), Source: batch.Source(),
batch.Order(), Raw: bytes.NewBuffer(FormatFastqBatch(batch, header_format, opt.SkipEmptySequence())),
Order: batch.Order(),
} }
chunkchan <- chunk chunkchan <- chunk
newIter.Push(batch) newIter.Push(batch)
@ -142,44 +142,6 @@ func WriteFastq(iterator obiiter.IBioSequence,
go ff(iterator.Split()) go ff(iterator.Split())
} }
next_to_send := 0
received := make(map[int]FileChunk, 100)
waitWriter.Add(1)
go func() {
for chunk := range chunkchan {
if chunk.order == next_to_send {
if chunk.text[0] != '@' {
log.Panicln("WriteFastq: FASTQ format error")
}
file.Write(chunk.text)
next_to_send++
chunk, ok := received[next_to_send]
for ok {
if chunk.text[0] != '@' {
log.Panicln("WriteFastq: FASTQ format error")
}
file.Write(chunk.text)
delete(received, next_to_send)
next_to_send++
chunk, ok = received[next_to_send]
}
} else {
if _, ok := received[chunk.order]; ok {
log.Panicln("WriteFastq: Two chunks with the same number")
}
received[chunk.order] = chunk
}
}
file.Close()
log.Debugln("End of the fastq file writing")
obiiter.UnregisterPipe()
waitWriter.Done()
}()
return newIter, nil return newIter, nil
} }

View File

@ -15,7 +15,7 @@ type SeqFileChunkParser func(string, io.Reader) (obiseq.BioSequenceSlice, error)
type SeqFileChunk struct { type SeqFileChunk struct {
Source string Source string
Raw io.Reader Raw *bytes.Buffer
Order int Order int
} }

View File

@ -0,0 +1,51 @@
package obiformats
import (
"io"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
log "github.com/sirupsen/logrus"
)
func WriteSeqFileChunk(
writer io.WriteCloser,
toBeClosed bool) ChannelSeqFileChunk {
obiiter.RegisterAPipe()
chunk_channel := make(ChannelSeqFileChunk)
go func() {
nextToPrint := 0
toBePrinted := make(map[int]SeqFileChunk)
for chunk := range chunk_channel {
if chunk.Order == nextToPrint {
_, _ = writer.Write(chunk.Raw.Bytes())
nextToPrint++
chunk, ok := toBePrinted[nextToPrint]
for ok {
_, _ = writer.Write(chunk.Raw.Bytes())
delete(toBePrinted, nextToPrint)
nextToPrint++
chunk, ok = toBePrinted[nextToPrint]
}
} else {
toBePrinted[chunk.Order] = chunk
}
}
if toBeClosed {
err := writer.Close()
if err != nil {
log.Fatalf("Cannot close the writer : %v", err)
}
}
obiiter.UnregisterPipe()
log.Warnf("The writer has been closed")
}()
return chunk_channel
}

View File

@ -7,7 +7,7 @@ import (
// TODO: The version number is extracted from git. This induces that the version // TODO: The version number is extracted from git. This induces that the version
// corresponds to the last commit, and not the one when the file will be // corresponds to the last commit, and not the one when the file will be
// commited // commited
var _Commit = "dfe2fc3" var _Commit = "776b8f7"
var _Version = "" var _Version = ""
// Version returns the version of the obitools package. // Version returns the version of the obitools package.