Optimize memory for readers and writers

This commit is contained in:
Eric Coissac
2024-08-05 10:48:28 +02:00
parent f83032e643
commit 886b5d9a96
11 changed files with 48 additions and 13 deletions

View File

@ -187,7 +187,7 @@ func _ParseEmblFile(
func ReadEMBL(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { func ReadEMBL(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
opt := MakeOptions(options) opt := MakeOptions(options)
buff := make([]byte, 1024*1024*1024*256) buff := make([]byte, 1024*1024*512)
entry_channel := ReadSeqFileChunk( entry_channel := ReadSeqFileChunk(
opt.Source(), opt.Source(),

View File

@ -232,7 +232,7 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
nworker := opt.ParallelWorkers() nworker := opt.ParallelWorkers()
buff := make([]byte, 1024*1024*1024) buff := make([]byte, 1024*1024)
chkchan := ReadSeqFileChunk( chkchan := ReadSeqFileChunk(
opt.Source(), opt.Source(),
@ -250,7 +250,7 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
out.WaitAndClose() out.WaitAndClose()
}() }()
newIter := out.SortBatches().Rebatch(opt.BatchSize()) newIter := out.SortBatches()
log.Debugln("Full file batch mode : ", opt.FullFileBatch()) log.Debugln("Full file batch mode : ", opt.FullFileBatch())

View File

@ -310,7 +310,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
nworker := opt.ParallelWorkers() nworker := opt.ParallelWorkers()
buff := make([]byte, 1024*1024*1024) buff := make([]byte, 1024*1024)
chkchan := ReadSeqFileChunk( chkchan := ReadSeqFileChunk(
opt.Source(), opt.Source(),
@ -332,7 +332,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
out.WaitAndClose() out.WaitAndClose()
}() }()
newIter := out.SortBatches().Rebatch(opt.BatchSize()) newIter := out.SortBatches()
log.Debugln("Full file batch mode : ", opt.FullFileBatch()) log.Debugln("Full file batch mode : ", opt.FullFileBatch())

View File

@ -126,7 +126,6 @@ func WriteFasta(iterator obiiter.IBioSequence,
options ...WithOption) (obiiter.IBioSequence, error) { options ...WithOption) (obiiter.IBioSequence, error) {
opt := MakeOptions(options) opt := MakeOptions(options)
iterator = iterator.Rebatch(opt.BatchSize())
file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile()) file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile())
newIter := obiiter.MakeIBioSequence() newIter := obiiter.MakeIBioSequence()
@ -142,7 +141,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
go func() { go func() {
newIter.WaitAndClose() newIter.WaitAndClose()
close(chunkchan) close(chunkchan)
log.Warnf("Writing fasta file done") log.Debugf("Writing fasta file done")
}() }()
ff := func(iterator obiiter.IBioSequence) { ff := func(iterator obiiter.IBioSequence) {

View File

@ -97,7 +97,7 @@ func WriteFastq(iterator obiiter.IBioSequence,
options ...WithOption) (obiiter.IBioSequence, error) { options ...WithOption) (obiiter.IBioSequence, error) {
opt := MakeOptions(options) opt := MakeOptions(options)
iterator = iterator.Rebatch(opt.BatchSize()) iterator = iterator
file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile()) file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile())

View File

@ -223,7 +223,7 @@ func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence,
opt := MakeOptions(options) opt := MakeOptions(options)
// entry_channel := make(chan _FileChunk) // entry_channel := make(chan _FileChunk)
buff := make([]byte, 1024*1024*1024*256) buff := make([]byte, 1024*1024*512)
entry_channel := ReadSeqFileChunk( entry_channel := ReadSeqFileChunk(
opt.Source(), opt.Source(),

View File

@ -46,6 +46,8 @@ func ReadSeqFileChunk(
chunk_channel := make(ChannelSeqFileChunk) chunk_channel := make(ChannelSeqFileChunk)
_FileChunkSize := len(buff)
go func() { go func() {
size := 0 size := 0
l := 0 l := 0

View File

@ -44,7 +44,7 @@ func WriteSeqFileChunk(
} }
obiiter.UnregisterPipe() obiiter.UnregisterPipe()
log.Warnf("The writer has been closed") log.Debugf("The writer has been closed")
}() }()
return chunk_channel return chunk_channel

View File

@ -464,6 +464,39 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence {
return newIter return newIter
} }
func (iterator IBioSequence) FilterEmpty() IBioSequence {
newIter := MakeIBioSequence()
newIter.Add(1)
go func() {
newIter.WaitAndClose()
}()
go func() {
order := 0
iterator = iterator.SortBatches()
for iterator.Next() {
seqs := iterator.Get()
lc := seqs.Len()
if lc > 0 {
newIter.Push(seqs.Reorder(order))
order++
}
}
newIter.Done()
}()
if iterator.IsPaired() {
newIter.MarkAsPaired()
}
return newIter
}
func (iterator IBioSequence) Recycle() { func (iterator IBioSequence) Recycle() {
log.Debugln("Start recycling of Bioseq objects") log.Debugln("Start recycling of Bioseq objects")

View File

@ -1,6 +1,7 @@
package obiiter package obiiter
import ( import (
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -40,8 +41,8 @@ func (iter IBioSequence) PairTo(p IBioSequence) IBioSequence {
newIter := MakeIBioSequence() newIter := MakeIBioSequence()
iter = iter.SortBatches() iter = iter.SortBatches().Rebatch(obioptions.CLIBatchSize())
p = p.SortBatches() p = p.SortBatches().Rebatch(obioptions.CLIBatchSize())
newIter.Add(1) newIter.Add(1)

View File

@ -7,7 +7,7 @@ import (
// TODO: The version number is extracted from git. This induces that the version // TODO: The version number is extracted from git. This induces that the version
// corresponds to the last commit, and not the one when the file will be // corresponds to the last commit, and not the one when the file will be
// commited // commited
var _Commit = "c0c1803" var _Commit = "f83032e"
var _Version = "Release 4.2.0" var _Version = "Release 4.2.0"
// Version returns the version of the obitools package. // Version returns the version of the obitools package.