diff --git a/pkg/obidefault/batch.go b/pkg/obidefault/batch.go index d2cc10b..83e9d3a 100644 --- a/pkg/obidefault/batch.go +++ b/pkg/obidefault/batch.go @@ -1,6 +1,12 @@ package obidefault -var _BatchSize = 2000 +// _BatchSize is the minimum number of sequences per batch (floor). +// Used as the minSeqs argument to RebatchBySize. +var _BatchSize = 1 + +// _BatchSizeMax is the maximum number of sequences per batch (ceiling). +// A batch is flushed when this count is reached regardless of memory usage. +var _BatchSizeMax = 2000 // SetBatchSize sets the size of the sequence batches. // @@ -25,10 +31,19 @@ func BatchSizePtr() *int { return &_BatchSize } +// BatchSizeMax returns the maximum number of sequences per batch. +func BatchSizeMax() int { + return _BatchSizeMax +} + +func BatchSizeMaxPtr() *int { + return &_BatchSizeMax +} + // _BatchMem holds the maximum cumulative memory (in bytes) per batch when // memory-based batching is requested. A value of 0 disables memory-based // batching and falls back to count-based batching. -var _BatchMem = 0 +var _BatchMem = 128 * 1024 * 1024 // 128 MB default; set to 0 to disable var _BatchMemStr = "" // SetBatchMem sets the memory budget per batch in bytes. diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index 9e25d65..101717c 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -444,12 +444,17 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { return newIter } -// RebatchBySize reorganises the stream into batches whose cumulative estimated -// memory footprint does not exceed maxBytes. A single sequence larger than -// maxBytes is emitted alone rather than dropped. minSeqs sets a lower bound on -// batch size (in number of sequences) so that very large sequences still form -// reasonably-sized work units; use 1 to disable. -func (iterator IBioSequence) RebatchBySize(maxBytes int, minSeqs int) IBioSequence { +// RebatchBySize reorganises the stream into batches bounded by two independent +// upper limits: maxCount (max number of sequences) and maxBytes (max cumulative +// estimated memory). A batch is flushed as soon as either limit would be +// exceeded. A single sequence larger than maxBytes is always emitted alone. +// Passing 0 for a limit disables that constraint; if both are 0 it falls back +// to Rebatch(obidefault.BatchSizeMax()). +func (iterator IBioSequence) RebatchBySize(maxBytes int, maxCount int) IBioSequence { + if maxBytes <= 0 && maxCount <= 0 { + return iterator.Rebatch(obidefault.BatchSizeMax()) + } + newIter := MakeIBioSequence() newIter.Add(1) @@ -479,9 +484,9 @@ func (iterator IBioSequence) RebatchBySize(maxBytes int, minSeqs int) IBioSequen source = seqs.Source() for _, s := range seqs.Slice() { sz := s.MemorySize() - // flush before adding if it would overflow, but only if - // we already meet the minimum sequence count - if bufBytes+sz > maxBytes && len(buffer) >= minSeqs { + countFull := maxCount > 0 && len(buffer) >= maxCount + memFull := maxBytes > 0 && bufBytes+sz > maxBytes && len(buffer) > 0 + if countFull || memFull { flush() } buffer = append(buffer, s) diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index 5a9878d..5ebb471 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -56,11 +56,15 @@ func RegisterGlobalOptions(options *getoptions.GetOpt) { options.IntVar(obidefault.BatchSizePtr(), "batch-size", obidefault.BatchSize(), options.GetEnv("OBIBATCHSIZE"), - options.Description("Number of sequence per batch for paralelle processing")) + options.Description("Minimum number of sequences per batch (floor, default 1)")) + + options.IntVar(obidefault.BatchSizeMaxPtr(), "batch-size-max", obidefault.BatchSizeMax(), + options.GetEnv("OBIBATCHSIZEMAX"), + options.Description("Maximum number of sequences per batch (ceiling, default 2000)")) options.StringVar(obidefault.BatchMemStrPtr(), "batch-mem", "", options.GetEnv("OBIBATCHMEM"), - options.Description("Maximum memory per batch (e.g. 128K, 64M, 1G). Overrides --batch-size when set.")) + options.Description("Maximum memory per batch (e.g. 128K, 64M, 1G; default: 128M). Set to 0 to disable.")) options.Bool("solexa", false, options.GetEnv("OBISOLEXA"), diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index a5e27bc..cbda802 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -214,9 +214,7 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { iterator = iterator.Speed("Reading sequences") - if obidefault.BatchMem() > 0 { - iterator = iterator.RebatchBySize(obidefault.BatchMem(), 1) - } + iterator = iterator.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax()) return iterator, nil }