From a2b26712b2614000e8adcdbe30ec5e13b93cf7f5 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 16 Mar 2026 22:06:14 +0100 Subject: [PATCH] refactor: replace fixed batch size with dynamic flushing based on count and memory Replace the old fixed batch-size mechanism in Distribute with a dynamic strategy that flushes batches when either BatchSizeMax() sequences or BatchMem() bytes are reached per key. This aligns with the RebatchBySize strategy and removes the optional sizes parameter. Also update related code: simplify Lua wrapper to accept optional capacity, and fix buffer growth logic in worker.go using slices.Grow correctly. Remove unused BatchSize() usage from obidistribute. --- pkg/obiiter/distribute.go | 42 ++++++++++-------------- pkg/obilua/obiseqslice.go | 3 +- pkg/obiseq/worker.go | 8 ++--- pkg/obitools/obidistribute/distribute.go | 3 +- 4 files changed, 25 insertions(+), 31 deletions(-) diff --git a/pkg/obiiter/distribute.go b/pkg/obiiter/distribute.go index 381f3bd..063b710 100644 --- a/pkg/obiiter/distribute.go +++ b/pkg/obiiter/distribute.go @@ -57,34 +57,21 @@ func (dist *IDistribute) Classifier() *obiseq.BioSequenceClassifier { } // Distribute organizes the biosequences from the iterator into batches -// based on the provided classifier and batch sizes. It returns an -// IDistribute instance that manages the distribution of the sequences. +// based on the provided classifier. It returns an IDistribute instance +// that manages the distribution of the sequences. // -// Parameters: -// - class: A pointer to a BioSequenceClassifier used to classify -// the biosequences during distribution. -// - sizes: Optional integer values specifying the batch size. If -// no sizes are provided, a default batch size of 5000 is used. -// -// Returns: -// An IDistribute instance that contains the outputs of the -// classified biosequences, a channel for new data notifications, -// and the classifier used for distribution. The method operates -// asynchronously, processing the sequences in separate goroutines. -// It ensures that the outputs are closed and cleaned up once -// processing is complete. -func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, sizes ...int) IDistribute { - batchsize := obidefault.BatchSize() +// Batches are flushed when either BatchSizeMax() sequences or BatchMem() +// bytes are accumulated per key, mirroring the RebatchBySize strategy. +func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier) IDistribute { + maxCount := obidefault.BatchSizeMax() + maxBytes := obidefault.BatchMem() outputs := make(map[int]IBioSequence, 100) slices := make(map[int]*obiseq.BioSequenceSlice, 100) + bufBytes := make(map[int]int, 100) orders := make(map[int]int, 100) news := make(chan int) - if len(sizes) > 0 { - batchsize = sizes[0] - } - jobDone := sync.WaitGroup{} lock := sync.Mutex{} @@ -115,6 +102,7 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz slice = &s slices[key] = slice orders[key] = 0 + bufBytes[key] = 0 lock.Lock() outputs[key] = MakeIBioSequence() @@ -123,14 +111,20 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz news <- key } - *slice = append(*slice, s) - - if len(*slice) == batchsize { + sz := s.MemorySize() + countFull := maxCount > 0 && len(*slice) >= maxCount + memFull := maxBytes > 0 && bufBytes[key]+sz > maxBytes && len(*slice) > 0 + if countFull || memFull { outputs[key].Push(MakeBioSequenceBatch(source, orders[key], *slice)) orders[key]++ s := obiseq.MakeBioSequenceSlice() slices[key] = &s + slice = &s + bufBytes[key] = 0 } + + *slice = append(*slice, s) + bufBytes[key] += sz } } diff --git a/pkg/obilua/obiseqslice.go b/pkg/obilua/obiseqslice.go index 6f6f443..dc0395d 100644 --- a/pkg/obilua/obiseqslice.go +++ b/pkg/obilua/obiseqslice.go @@ -31,7 +31,8 @@ func obiseqslice2Lua(interpreter *lua.LState, } func newObiSeqSlice(luaState *lua.LState) int { - seqslice := obiseq.NewBioSequenceSlice() + capacity := luaState.OptInt(1, 0) + seqslice := obiseq.NewBioSequenceSlice(capacity) luaState.Push(obiseqslice2Lua(luaState, seqslice)) return 1 } diff --git a/pkg/obiseq/worker.go b/pkg/obiseq/worker.go index 3202bcc..c793836 100644 --- a/pkg/obiseq/worker.go +++ b/pkg/obiseq/worker.go @@ -104,11 +104,11 @@ func SeqToSliceWorker(worker SeqWorker, for _, s := range input { r, err := worker(s) if err == nil { + if i+len(r) > cap(output) { + output = slices.Grow(output[:i], len(r)) + output = output[:cap(output)] + } for _, rs := range r { - if i == len(output) { - output = slices.Grow(output, cap(output)) - output = output[:cap(output)] - } output[i] = rs i++ } diff --git a/pkg/obitools/obidistribute/distribute.go b/pkg/obitools/obidistribute/distribute.go index 6d23c53..5d033e7 100644 --- a/pkg/obitools/obidistribute/distribute.go +++ b/pkg/obitools/obidistribute/distribute.go @@ -46,8 +46,7 @@ func CLIDistributeSequence(sequences obiiter.IBioSequence) { formater = obiformats.WriteSequencesToFile } - dispatcher := sequences.Distribute(CLISequenceClassifier(), - obidefault.BatchSize()) + dispatcher := sequences.Distribute(CLISequenceClassifier()) obiformats.WriterDispatcher(CLIFileNamePattern(), dispatcher, formater, opts...,