From 6a2f867ae1d235e40dacd2d66145c3b654c14e33 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Thu, 1 Aug 2024 17:22:46 +0200 Subject: [PATCH] Factorize code between fasta and fastq writers --- pkg/obiformats/fastseq_write_fasta.go | 52 ++++----------------------- pkg/obiformats/fastseq_write_fastq.go | 48 +++---------------------- pkg/obiformats/seqfile_chunk_read.go | 2 +- pkg/obiformats/seqfile_chunk_write.go | 51 ++++++++++++++++++++++++++ pkg/obioptions/version.go | 2 +- 5 files changed, 65 insertions(+), 90 deletions(-) create mode 100644 pkg/obiformats/seqfile_chunk_write.go diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index f310e16..5e8e638 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -7,8 +7,6 @@ import ( "io" "os" "strings" - "sync" - "time" log "github.com/sirupsen/logrus" @@ -135,21 +133,16 @@ func WriteFasta(iterator obiiter.IBioSequence, nwriters := opt.ParallelWorkers() - obiiter.RegisterAPipe() - chunkchan := make(chan FileChunk) + chunkchan := WriteSeqFileChunk(file, opt.CloseFile()) header_format := opt.FormatFastSeqHeader() newIter.Add(nwriters) - var waitWriter sync.WaitGroup go func() { newIter.WaitAndClose() - for len(chunkchan) > 0 { - time.Sleep(time.Millisecond) - } close(chunkchan) - waitWriter.Wait() + log.Warnf("Writing fasta file done") }() ff := func(iterator obiiter.IBioSequence) { @@ -159,10 +152,12 @@ func WriteFasta(iterator obiiter.IBioSequence, log.Debugf("Formating fasta chunk %d", batch.Order()) - chunkchan <- FileChunk{ - FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()), - batch.Order(), + chunkchan <- SeqFileChunk{ + Source: batch.Source(), + Raw: bytes.NewBuffer(FormatFastaBatch(batch, header_format, opt.SkipEmptySequence())), + Order: batch.Order(), } + log.Debugf("Fasta chunk %d formated", batch.Order()) newIter.Push(batch) @@ -176,39 +171,6 @@ func WriteFasta(iterator obiiter.IBioSequence, go ff(iterator.Split()) } - next_to_send := 0 - received := make(map[int]FileChunk, 100) - - waitWriter.Add(1) - go func() { - for chunk := range chunkchan { - if chunk.order == next_to_send { - file.Write(chunk.text) - log.Debugf("Fasta chunk %d written", chunk.order) - next_to_send++ - chunk, ok := received[next_to_send] - for ok { - file.Write(chunk.text) - log.Debugf("Fasta chunk %d written", chunk.order) - delete(received, next_to_send) - next_to_send++ - chunk, ok = received[next_to_send] - } - } else { - log.Debugf("Store Fasta chunk %d", chunk.order) - received[chunk.order] = chunk - } - - } - - file.Close() - - log.Debugln("End of the fasta file writing") - obiiter.UnregisterPipe() - waitWriter.Done() - - }() - return newIter, nil } diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 9c3d16e..9974f3c 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -105,8 +105,7 @@ func WriteFastq(iterator obiiter.IBioSequence, nwriters := opt.ParallelWorkers() - obiiter.RegisterAPipe() - chunkchan := make(chan FileChunk) + chunkchan := WriteSeqFileChunk(file, opt.CloseFile()) header_format := opt.FormatFastSeqHeader() @@ -126,9 +125,10 @@ func WriteFastq(iterator obiiter.IBioSequence, ff := func(iterator obiiter.IBioSequence) { for iterator.Next() { batch := iterator.Get() - chunk := FileChunk{ - FormatFastqBatch(batch, header_format, opt.SkipEmptySequence()), - batch.Order(), + chunk := SeqFileChunk{ + Source: batch.Source(), + Raw: bytes.NewBuffer(FormatFastqBatch(batch, header_format, opt.SkipEmptySequence())), + Order: batch.Order(), } chunkchan <- chunk newIter.Push(batch) @@ -142,44 +142,6 @@ func WriteFastq(iterator obiiter.IBioSequence, go ff(iterator.Split()) } - next_to_send := 0 - received := make(map[int]FileChunk, 100) - - waitWriter.Add(1) - go func() { - for chunk := range chunkchan { - if chunk.order == next_to_send { - if chunk.text[0] != '@' { - log.Panicln("WriteFastq: FASTQ format error") - } - file.Write(chunk.text) - next_to_send++ - chunk, ok := received[next_to_send] - for ok { - if chunk.text[0] != '@' { - log.Panicln("WriteFastq: FASTQ format error") - } - file.Write(chunk.text) - delete(received, next_to_send) - next_to_send++ - chunk, ok = received[next_to_send] - } - } else { - if _, ok := received[chunk.order]; ok { - log.Panicln("WriteFastq: Two chunks with the same number") - } - received[chunk.order] = chunk - } - - } - - file.Close() - - log.Debugln("End of the fastq file writing") - obiiter.UnregisterPipe() - waitWriter.Done() - }() - return newIter, nil } diff --git a/pkg/obiformats/seqfile_chunk_read.go b/pkg/obiformats/seqfile_chunk_read.go index f53489d..fd428bd 100644 --- a/pkg/obiformats/seqfile_chunk_read.go +++ b/pkg/obiformats/seqfile_chunk_read.go @@ -15,7 +15,7 @@ type SeqFileChunkParser func(string, io.Reader) (obiseq.BioSequenceSlice, error) type SeqFileChunk struct { Source string - Raw io.Reader + Raw *bytes.Buffer Order int } diff --git a/pkg/obiformats/seqfile_chunk_write.go b/pkg/obiformats/seqfile_chunk_write.go new file mode 100644 index 0000000..acc566a --- /dev/null +++ b/pkg/obiformats/seqfile_chunk_write.go @@ -0,0 +1,51 @@ +package obiformats + +import ( + "io" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" + + log "github.com/sirupsen/logrus" +) + +func WriteSeqFileChunk( + writer io.WriteCloser, + toBeClosed bool) ChannelSeqFileChunk { + + obiiter.RegisterAPipe() + + chunk_channel := make(ChannelSeqFileChunk) + + go func() { + nextToPrint := 0 + toBePrinted := make(map[int]SeqFileChunk) + for chunk := range chunk_channel { + if chunk.Order == nextToPrint { + _, _ = writer.Write(chunk.Raw.Bytes()) + nextToPrint++ + + chunk, ok := toBePrinted[nextToPrint] + for ok { + _, _ = writer.Write(chunk.Raw.Bytes()) + delete(toBePrinted, nextToPrint) + nextToPrint++ + chunk, ok = toBePrinted[nextToPrint] + } + } else { + toBePrinted[chunk.Order] = chunk + } + } + + if toBeClosed { + err := writer.Close() + if err != nil { + log.Fatalf("Cannot close the writer : %v", err) + } + } + + obiiter.UnregisterPipe() + log.Warnf("The writer has been closed") + }() + + return chunk_channel +} diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index dfb6d5f..c46714d 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -7,7 +7,7 @@ import ( // TODO: The version number is extracted from git. This induces that the version // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "dfe2fc3" +var _Commit = "776b8f7" var _Version = "" // Version returns the version of the obitools package.