refactor: replace single batch size with min/max bounds and memory limits

Introduce separate _BatchSize (min) and _BatchSizeMax (max) constants to replace the single _BatchSize variable. Update RebatchBySize to accept both maxBytes and maxCount parameters, flushing when either limit is exceeded. Set default batch size min to 1, max to 2000, and memory limit to 128 MB. Update CLI options and sequence_reader.go accordingly.
This commit is contained in:
Eric Coissac
2026-03-13 15:07:31 +01:00
parent 40769bf827
commit 1e1f575d1c
4 changed files with 38 additions and 16 deletions

View File

@@ -1,6 +1,12 @@
package obidefault package obidefault
var _BatchSize = 2000 // _BatchSize is the minimum number of sequences per batch (floor).
// Used as the minSeqs argument to RebatchBySize.
var _BatchSize = 1
// _BatchSizeMax is the maximum number of sequences per batch (ceiling).
// A batch is flushed when this count is reached regardless of memory usage.
var _BatchSizeMax = 2000
// SetBatchSize sets the size of the sequence batches. // SetBatchSize sets the size of the sequence batches.
// //
@@ -25,10 +31,19 @@ func BatchSizePtr() *int {
return &_BatchSize return &_BatchSize
} }
// BatchSizeMax returns the maximum number of sequences per batch.
func BatchSizeMax() int {
return _BatchSizeMax
}
func BatchSizeMaxPtr() *int {
return &_BatchSizeMax
}
// _BatchMem holds the maximum cumulative memory (in bytes) per batch when // _BatchMem holds the maximum cumulative memory (in bytes) per batch when
// memory-based batching is requested. A value of 0 disables memory-based // memory-based batching is requested. A value of 0 disables memory-based
// batching and falls back to count-based batching. // batching and falls back to count-based batching.
var _BatchMem = 0 var _BatchMem = 128 * 1024 * 1024 // 128 MB default; set to 0 to disable
var _BatchMemStr = "" var _BatchMemStr = ""
// SetBatchMem sets the memory budget per batch in bytes. // SetBatchMem sets the memory budget per batch in bytes.

View File

@@ -444,12 +444,17 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence {
return newIter return newIter
} }
// RebatchBySize reorganises the stream into batches whose cumulative estimated // RebatchBySize reorganises the stream into batches bounded by two independent
// memory footprint does not exceed maxBytes. A single sequence larger than // upper limits: maxCount (max number of sequences) and maxBytes (max cumulative
// maxBytes is emitted alone rather than dropped. minSeqs sets a lower bound on // estimated memory). A batch is flushed as soon as either limit would be
// batch size (in number of sequences) so that very large sequences still form // exceeded. A single sequence larger than maxBytes is always emitted alone.
// reasonably-sized work units; use 1 to disable. // Passing 0 for a limit disables that constraint; if both are 0 it falls back
func (iterator IBioSequence) RebatchBySize(maxBytes int, minSeqs int) IBioSequence { // to Rebatch(obidefault.BatchSizeMax()).
func (iterator IBioSequence) RebatchBySize(maxBytes int, maxCount int) IBioSequence {
if maxBytes <= 0 && maxCount <= 0 {
return iterator.Rebatch(obidefault.BatchSizeMax())
}
newIter := MakeIBioSequence() newIter := MakeIBioSequence()
newIter.Add(1) newIter.Add(1)
@@ -479,9 +484,9 @@ func (iterator IBioSequence) RebatchBySize(maxBytes int, minSeqs int) IBioSequen
source = seqs.Source() source = seqs.Source()
for _, s := range seqs.Slice() { for _, s := range seqs.Slice() {
sz := s.MemorySize() sz := s.MemorySize()
// flush before adding if it would overflow, but only if countFull := maxCount > 0 && len(buffer) >= maxCount
// we already meet the minimum sequence count memFull := maxBytes > 0 && bufBytes+sz > maxBytes && len(buffer) > 0
if bufBytes+sz > maxBytes && len(buffer) >= minSeqs { if countFull || memFull {
flush() flush()
} }
buffer = append(buffer, s) buffer = append(buffer, s)

View File

@@ -56,11 +56,15 @@ func RegisterGlobalOptions(options *getoptions.GetOpt) {
options.IntVar(obidefault.BatchSizePtr(), "batch-size", obidefault.BatchSize(), options.IntVar(obidefault.BatchSizePtr(), "batch-size", obidefault.BatchSize(),
options.GetEnv("OBIBATCHSIZE"), options.GetEnv("OBIBATCHSIZE"),
options.Description("Number of sequence per batch for paralelle processing")) options.Description("Minimum number of sequences per batch (floor, default 1)"))
options.IntVar(obidefault.BatchSizeMaxPtr(), "batch-size-max", obidefault.BatchSizeMax(),
options.GetEnv("OBIBATCHSIZEMAX"),
options.Description("Maximum number of sequences per batch (ceiling, default 2000)"))
options.StringVar(obidefault.BatchMemStrPtr(), "batch-mem", "", options.StringVar(obidefault.BatchMemStrPtr(), "batch-mem", "",
options.GetEnv("OBIBATCHMEM"), options.GetEnv("OBIBATCHMEM"),
options.Description("Maximum memory per batch (e.g. 128K, 64M, 1G). Overrides --batch-size when set.")) options.Description("Maximum memory per batch (e.g. 128K, 64M, 1G; default: 128M). Set to 0 to disable."))
options.Bool("solexa", false, options.Bool("solexa", false,
options.GetEnv("OBISOLEXA"), options.GetEnv("OBISOLEXA"),

View File

@@ -214,9 +214,7 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) {
iterator = iterator.Speed("Reading sequences") iterator = iterator.Speed("Reading sequences")
if obidefault.BatchMem() > 0 { iterator = iterator.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax())
iterator = iterator.RebatchBySize(obidefault.BatchMem(), 1)
}
return iterator, nil return iterator, nil
} }