From 1e1f575d1c14f57fec30ed7331ff7a4c16548646 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 13 Mar 2026 15:07:31 +0100 Subject: [PATCH] refactor: replace single batch size with min/max bounds and memory limits Introduce separate _BatchSize (min) and _BatchSizeMax (max) constants to replace the single _BatchSize variable. Update RebatchBySize to accept both maxBytes and maxCount parameters, flushing when either limit is exceeded. Set default batch size min to 1, max to 2000, and memory limit to 128 MB. Update CLI options and sequence_reader.go accordingly. --- pkg/obidefault/batch.go | 19 ++++++++++++++++-- pkg/obiiter/batchiterator.go | 23 +++++++++++++--------- pkg/obioptions/options.go | 8 ++++++-- pkg/obitools/obiconvert/sequence_reader.go | 4 +--- 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/pkg/obidefault/batch.go b/pkg/obidefault/batch.go index d2cc10b..83e9d3a 100644 --- a/pkg/obidefault/batch.go +++ b/pkg/obidefault/batch.go @@ -1,6 +1,12 @@ package obidefault -var _BatchSize = 2000 +// _BatchSize is the minimum number of sequences per batch (floor). +// Used as the minSeqs argument to RebatchBySize. +var _BatchSize = 1 + +// _BatchSizeMax is the maximum number of sequences per batch (ceiling). +// A batch is flushed when this count is reached regardless of memory usage. +var _BatchSizeMax = 2000 // SetBatchSize sets the size of the sequence batches. // @@ -25,10 +31,19 @@ func BatchSizePtr() *int { return &_BatchSize } +// BatchSizeMax returns the maximum number of sequences per batch. +func BatchSizeMax() int { + return _BatchSizeMax +} + +func BatchSizeMaxPtr() *int { + return &_BatchSizeMax +} + // _BatchMem holds the maximum cumulative memory (in bytes) per batch when // memory-based batching is requested. A value of 0 disables memory-based // batching and falls back to count-based batching. -var _BatchMem = 0 +var _BatchMem = 128 * 1024 * 1024 // 128 MB default; set to 0 to disable var _BatchMemStr = "" // SetBatchMem sets the memory budget per batch in bytes. diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index 9e25d65..101717c 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -444,12 +444,17 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { return newIter } -// RebatchBySize reorganises the stream into batches whose cumulative estimated -// memory footprint does not exceed maxBytes. A single sequence larger than -// maxBytes is emitted alone rather than dropped. minSeqs sets a lower bound on -// batch size (in number of sequences) so that very large sequences still form -// reasonably-sized work units; use 1 to disable. -func (iterator IBioSequence) RebatchBySize(maxBytes int, minSeqs int) IBioSequence { +// RebatchBySize reorganises the stream into batches bounded by two independent +// upper limits: maxCount (max number of sequences) and maxBytes (max cumulative +// estimated memory). A batch is flushed as soon as either limit would be +// exceeded. A single sequence larger than maxBytes is always emitted alone. +// Passing 0 for a limit disables that constraint; if both are 0 it falls back +// to Rebatch(obidefault.BatchSizeMax()). +func (iterator IBioSequence) RebatchBySize(maxBytes int, maxCount int) IBioSequence { + if maxBytes <= 0 && maxCount <= 0 { + return iterator.Rebatch(obidefault.BatchSizeMax()) + } + newIter := MakeIBioSequence() newIter.Add(1) @@ -479,9 +484,9 @@ func (iterator IBioSequence) RebatchBySize(maxBytes int, minSeqs int) IBioSequen source = seqs.Source() for _, s := range seqs.Slice() { sz := s.MemorySize() - // flush before adding if it would overflow, but only if - // we already meet the minimum sequence count - if bufBytes+sz > maxBytes && len(buffer) >= minSeqs { + countFull := maxCount > 0 && len(buffer) >= maxCount + memFull := maxBytes > 0 && bufBytes+sz > maxBytes && len(buffer) > 0 + if countFull || memFull { flush() } buffer = append(buffer, s) diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index 5a9878d..5ebb471 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -56,11 +56,15 @@ func RegisterGlobalOptions(options *getoptions.GetOpt) { options.IntVar(obidefault.BatchSizePtr(), "batch-size", obidefault.BatchSize(), options.GetEnv("OBIBATCHSIZE"), - options.Description("Number of sequence per batch for paralelle processing")) + options.Description("Minimum number of sequences per batch (floor, default 1)")) + + options.IntVar(obidefault.BatchSizeMaxPtr(), "batch-size-max", obidefault.BatchSizeMax(), + options.GetEnv("OBIBATCHSIZEMAX"), + options.Description("Maximum number of sequences per batch (ceiling, default 2000)")) options.StringVar(obidefault.BatchMemStrPtr(), "batch-mem", "", options.GetEnv("OBIBATCHMEM"), - options.Description("Maximum memory per batch (e.g. 128K, 64M, 1G). Overrides --batch-size when set.")) + options.Description("Maximum memory per batch (e.g. 128K, 64M, 1G; default: 128M). Set to 0 to disable.")) options.Bool("solexa", false, options.GetEnv("OBISOLEXA"), diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index a5e27bc..cbda802 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -214,9 +214,7 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { iterator = iterator.Speed("Reading sequences") - if obidefault.BatchMem() > 0 { - iterator = iterator.RebatchBySize(obidefault.BatchMem(), 1) - } + iterator = iterator.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax()) return iterator, nil }