diff --git a/pkg/obiiter/distribute.go b/pkg/obiiter/distribute.go index 381f3bd..063b710 100644 --- a/pkg/obiiter/distribute.go +++ b/pkg/obiiter/distribute.go @@ -57,34 +57,21 @@ func (dist *IDistribute) Classifier() *obiseq.BioSequenceClassifier { } // Distribute organizes the biosequences from the iterator into batches -// based on the provided classifier and batch sizes. It returns an -// IDistribute instance that manages the distribution of the sequences. +// based on the provided classifier. It returns an IDistribute instance +// that manages the distribution of the sequences. // -// Parameters: -// - class: A pointer to a BioSequenceClassifier used to classify -// the biosequences during distribution. -// - sizes: Optional integer values specifying the batch size. If -// no sizes are provided, a default batch size of 5000 is used. -// -// Returns: -// An IDistribute instance that contains the outputs of the -// classified biosequences, a channel for new data notifications, -// and the classifier used for distribution. The method operates -// asynchronously, processing the sequences in separate goroutines. -// It ensures that the outputs are closed and cleaned up once -// processing is complete. -func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, sizes ...int) IDistribute { - batchsize := obidefault.BatchSize() +// Batches are flushed when either BatchSizeMax() sequences or BatchMem() +// bytes are accumulated per key, mirroring the RebatchBySize strategy. +func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier) IDistribute { + maxCount := obidefault.BatchSizeMax() + maxBytes := obidefault.BatchMem() outputs := make(map[int]IBioSequence, 100) slices := make(map[int]*obiseq.BioSequenceSlice, 100) + bufBytes := make(map[int]int, 100) orders := make(map[int]int, 100) news := make(chan int) - if len(sizes) > 0 { - batchsize = sizes[0] - } - jobDone := sync.WaitGroup{} lock := sync.Mutex{} @@ -115,6 +102,7 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz slice = &s slices[key] = slice orders[key] = 0 + bufBytes[key] = 0 lock.Lock() outputs[key] = MakeIBioSequence() @@ -123,14 +111,20 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz news <- key } - *slice = append(*slice, s) - - if len(*slice) == batchsize { + sz := s.MemorySize() + countFull := maxCount > 0 && len(*slice) >= maxCount + memFull := maxBytes > 0 && bufBytes[key]+sz > maxBytes && len(*slice) > 0 + if countFull || memFull { outputs[key].Push(MakeBioSequenceBatch(source, orders[key], *slice)) orders[key]++ s := obiseq.MakeBioSequenceSlice() slices[key] = &s + slice = &s + bufBytes[key] = 0 } + + *slice = append(*slice, s) + bufBytes[key] += sz } } diff --git a/pkg/obilua/obiseqslice.go b/pkg/obilua/obiseqslice.go index 6f6f443..dc0395d 100644 --- a/pkg/obilua/obiseqslice.go +++ b/pkg/obilua/obiseqslice.go @@ -31,7 +31,8 @@ func obiseqslice2Lua(interpreter *lua.LState, } func newObiSeqSlice(luaState *lua.LState) int { - seqslice := obiseq.NewBioSequenceSlice() + capacity := luaState.OptInt(1, 0) + seqslice := obiseq.NewBioSequenceSlice(capacity) luaState.Push(obiseqslice2Lua(luaState, seqslice)) return 1 } diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index 98ffb57..947634b 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -3,7 +3,7 @@ package obioptions // Version is automatically updated by the Makefile from version.txt // The patch number (third digit) is incremented on each push to the repository -var _Version = "Release 4.4.28" +var _Version = "Release 4.4.29" // Version returns the version of the obitools package. // diff --git a/pkg/obiseq/worker.go b/pkg/obiseq/worker.go index 3202bcc..c793836 100644 --- a/pkg/obiseq/worker.go +++ b/pkg/obiseq/worker.go @@ -104,11 +104,11 @@ func SeqToSliceWorker(worker SeqWorker, for _, s := range input { r, err := worker(s) if err == nil { + if i+len(r) > cap(output) { + output = slices.Grow(output[:i], len(r)) + output = output[:cap(output)] + } for _, rs := range r { - if i == len(output) { - output = slices.Grow(output, cap(output)) - output = output[:cap(output)] - } output[i] = rs i++ } diff --git a/pkg/obitools/obidistribute/distribute.go b/pkg/obitools/obidistribute/distribute.go index 6d23c53..5d033e7 100644 --- a/pkg/obitools/obidistribute/distribute.go +++ b/pkg/obitools/obidistribute/distribute.go @@ -46,8 +46,7 @@ func CLIDistributeSequence(sequences obiiter.IBioSequence) { formater = obiformats.WriteSequencesToFile } - dispatcher := sequences.Distribute(CLISequenceClassifier(), - obidefault.BatchSize()) + dispatcher := sequences.Distribute(CLISequenceClassifier()) obiformats.WriterDispatcher(CLIFileNamePattern(), dispatcher, formater, opts..., diff --git a/version.txt b/version.txt index 01d272b..33cf5f3 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -4.4.28 +4.4.29