From 886b5d9a969983275e064fe72d486b90acb25f84 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 5 Aug 2024 10:48:28 +0200 Subject: [PATCH] Optimize memory for readers and writers --- pkg/obiformats/embl_read.go | 2 +- pkg/obiformats/fastaseq_read.go | 4 ++-- pkg/obiformats/fastqseq_read.go | 4 ++-- pkg/obiformats/fastseq_write_fasta.go | 3 +-- pkg/obiformats/fastseq_write_fastq.go | 2 +- pkg/obiformats/genbank_read.go | 2 +- pkg/obiformats/seqfile_chunk_read.go | 2 ++ pkg/obiformats/seqfile_chunk_write.go | 2 +- pkg/obiiter/batchiterator.go | 33 +++++++++++++++++++++++++++ pkg/obiiter/paired.go | 5 ++-- pkg/obioptions/version.go | 2 +- 11 files changed, 48 insertions(+), 13 deletions(-) diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index dddf1e5..1cd5b3c 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -187,7 +187,7 @@ func _ParseEmblFile( func ReadEMBL(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) - buff := make([]byte, 1024*1024*1024*256) + buff := make([]byte, 1024*1024*512) entry_channel := ReadSeqFileChunk( opt.Source(), diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index 8e9408c..cb162db 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -232,7 +232,7 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e nworker := opt.ParallelWorkers() - buff := make([]byte, 1024*1024*1024) + buff := make([]byte, 1024*1024) chkchan := ReadSeqFileChunk( opt.Source(), @@ -250,7 +250,7 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e out.WaitAndClose() }() - newIter := out.SortBatches().Rebatch(opt.BatchSize()) + newIter := out.SortBatches() log.Debugln("Full file batch mode : ", opt.FullFileBatch()) diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index 64d37b8..85f8088 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -310,7 +310,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e nworker := opt.ParallelWorkers() - buff := make([]byte, 1024*1024*1024) + buff := make([]byte, 1024*1024) chkchan := ReadSeqFileChunk( opt.Source(), @@ -332,7 +332,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e out.WaitAndClose() }() - newIter := out.SortBatches().Rebatch(opt.BatchSize()) + newIter := out.SortBatches() log.Debugln("Full file batch mode : ", opt.FullFileBatch()) diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index 5b899e1..2f0d9ec 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -126,7 +126,6 @@ func WriteFasta(iterator obiiter.IBioSequence, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) - iterator = iterator.Rebatch(opt.BatchSize()) file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile()) newIter := obiiter.MakeIBioSequence() @@ -142,7 +141,7 @@ func WriteFasta(iterator obiiter.IBioSequence, go func() { newIter.WaitAndClose() close(chunkchan) - log.Warnf("Writing fasta file done") + log.Debugf("Writing fasta file done") }() ff := func(iterator obiiter.IBioSequence) { diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index a09f2f2..c79ad5a 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -97,7 +97,7 @@ func WriteFastq(iterator obiiter.IBioSequence, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) - iterator = iterator.Rebatch(opt.BatchSize()) + iterator = iterator file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile()) diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index f1b261a..d6dec7e 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -223,7 +223,7 @@ func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, opt := MakeOptions(options) // entry_channel := make(chan _FileChunk) - buff := make([]byte, 1024*1024*1024*256) + buff := make([]byte, 1024*1024*512) entry_channel := ReadSeqFileChunk( opt.Source(), diff --git a/pkg/obiformats/seqfile_chunk_read.go b/pkg/obiformats/seqfile_chunk_read.go index fd428bd..ed214ce 100644 --- a/pkg/obiformats/seqfile_chunk_read.go +++ b/pkg/obiformats/seqfile_chunk_read.go @@ -46,6 +46,8 @@ func ReadSeqFileChunk( chunk_channel := make(ChannelSeqFileChunk) + _FileChunkSize := len(buff) + go func() { size := 0 l := 0 diff --git a/pkg/obiformats/seqfile_chunk_write.go b/pkg/obiformats/seqfile_chunk_write.go index acc566a..5752788 100644 --- a/pkg/obiformats/seqfile_chunk_write.go +++ b/pkg/obiformats/seqfile_chunk_write.go @@ -44,7 +44,7 @@ func WriteSeqFileChunk( } obiiter.UnregisterPipe() - log.Warnf("The writer has been closed") + log.Debugf("The writer has been closed") }() return chunk_channel diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index a9f28aa..f6e43fe 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -464,6 +464,39 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { return newIter } +func (iterator IBioSequence) FilterEmpty() IBioSequence { + + newIter := MakeIBioSequence() + + newIter.Add(1) + + go func() { + newIter.WaitAndClose() + }() + + go func() { + order := 0 + iterator = iterator.SortBatches() + + for iterator.Next() { + seqs := iterator.Get() + lc := seqs.Len() + + if lc > 0 { + newIter.Push(seqs.Reorder(order)) + order++ + } + } + + newIter.Done() + }() + + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + + return newIter +} func (iterator IBioSequence) Recycle() { log.Debugln("Start recycling of Bioseq objects") diff --git a/pkg/obiiter/paired.go b/pkg/obiiter/paired.go index d8c0574..5fc2d12 100644 --- a/pkg/obiiter/paired.go +++ b/pkg/obiiter/paired.go @@ -1,6 +1,7 @@ package obiiter import ( + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" log "github.com/sirupsen/logrus" ) @@ -40,8 +41,8 @@ func (iter IBioSequence) PairTo(p IBioSequence) IBioSequence { newIter := MakeIBioSequence() - iter = iter.SortBatches() - p = p.SortBatches() + iter = iter.SortBatches().Rebatch(obioptions.CLIBatchSize()) + p = p.SortBatches().Rebatch(obioptions.CLIBatchSize()) newIter.Add(1) diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index 8972ca3..c208bca 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -7,7 +7,7 @@ import ( // TODO: The version number is extracted from git. This induces that the version // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "c0c1803" +var _Commit = "f83032e" var _Version = "Release 4.2.0" // Version returns the version of the obitools package.