mirror of
https://github.com/metabarcoding/obitools4.git
synced 2026-03-25 05:20:52 +00:00
refactor: replace fixed batch size with dynamic flushing based on count and memory
Replace the old fixed batch-size mechanism in Distribute with a dynamic strategy that flushes batches when either BatchSizeMax() sequences or BatchMem() bytes are reached per key. This aligns with the RebatchBySize strategy and removes the optional sizes parameter. Also update related code: simplify Lua wrapper to accept optional capacity, and fix buffer growth logic in worker.go using slices.Grow correctly. Remove unused BatchSize() usage from obidistribute.
This commit is contained in:
@@ -57,34 +57,21 @@ func (dist *IDistribute) Classifier() *obiseq.BioSequenceClassifier {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Distribute organizes the biosequences from the iterator into batches
|
// Distribute organizes the biosequences from the iterator into batches
|
||||||
// based on the provided classifier and batch sizes. It returns an
|
// based on the provided classifier. It returns an IDistribute instance
|
||||||
// IDistribute instance that manages the distribution of the sequences.
|
// that manages the distribution of the sequences.
|
||||||
//
|
//
|
||||||
// Parameters:
|
// Batches are flushed when either BatchSizeMax() sequences or BatchMem()
|
||||||
// - class: A pointer to a BioSequenceClassifier used to classify
|
// bytes are accumulated per key, mirroring the RebatchBySize strategy.
|
||||||
// the biosequences during distribution.
|
func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier) IDistribute {
|
||||||
// - sizes: Optional integer values specifying the batch size. If
|
maxCount := obidefault.BatchSizeMax()
|
||||||
// no sizes are provided, a default batch size of 5000 is used.
|
maxBytes := obidefault.BatchMem()
|
||||||
//
|
|
||||||
// Returns:
|
|
||||||
// An IDistribute instance that contains the outputs of the
|
|
||||||
// classified biosequences, a channel for new data notifications,
|
|
||||||
// and the classifier used for distribution. The method operates
|
|
||||||
// asynchronously, processing the sequences in separate goroutines.
|
|
||||||
// It ensures that the outputs are closed and cleaned up once
|
|
||||||
// processing is complete.
|
|
||||||
func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, sizes ...int) IDistribute {
|
|
||||||
batchsize := obidefault.BatchSize()
|
|
||||||
|
|
||||||
outputs := make(map[int]IBioSequence, 100)
|
outputs := make(map[int]IBioSequence, 100)
|
||||||
slices := make(map[int]*obiseq.BioSequenceSlice, 100)
|
slices := make(map[int]*obiseq.BioSequenceSlice, 100)
|
||||||
|
bufBytes := make(map[int]int, 100)
|
||||||
orders := make(map[int]int, 100)
|
orders := make(map[int]int, 100)
|
||||||
news := make(chan int)
|
news := make(chan int)
|
||||||
|
|
||||||
if len(sizes) > 0 {
|
|
||||||
batchsize = sizes[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
jobDone := sync.WaitGroup{}
|
jobDone := sync.WaitGroup{}
|
||||||
lock := sync.Mutex{}
|
lock := sync.Mutex{}
|
||||||
|
|
||||||
@@ -115,6 +102,7 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz
|
|||||||
slice = &s
|
slice = &s
|
||||||
slices[key] = slice
|
slices[key] = slice
|
||||||
orders[key] = 0
|
orders[key] = 0
|
||||||
|
bufBytes[key] = 0
|
||||||
|
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
outputs[key] = MakeIBioSequence()
|
outputs[key] = MakeIBioSequence()
|
||||||
@@ -123,14 +111,20 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz
|
|||||||
news <- key
|
news <- key
|
||||||
}
|
}
|
||||||
|
|
||||||
*slice = append(*slice, s)
|
sz := s.MemorySize()
|
||||||
|
countFull := maxCount > 0 && len(*slice) >= maxCount
|
||||||
if len(*slice) == batchsize {
|
memFull := maxBytes > 0 && bufBytes[key]+sz > maxBytes && len(*slice) > 0
|
||||||
|
if countFull || memFull {
|
||||||
outputs[key].Push(MakeBioSequenceBatch(source, orders[key], *slice))
|
outputs[key].Push(MakeBioSequenceBatch(source, orders[key], *slice))
|
||||||
orders[key]++
|
orders[key]++
|
||||||
s := obiseq.MakeBioSequenceSlice()
|
s := obiseq.MakeBioSequenceSlice()
|
||||||
slices[key] = &s
|
slices[key] = &s
|
||||||
|
slice = &s
|
||||||
|
bufBytes[key] = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*slice = append(*slice, s)
|
||||||
|
bufBytes[key] += sz
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -31,7 +31,8 @@ func obiseqslice2Lua(interpreter *lua.LState,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newObiSeqSlice(luaState *lua.LState) int {
|
func newObiSeqSlice(luaState *lua.LState) int {
|
||||||
seqslice := obiseq.NewBioSequenceSlice()
|
capacity := luaState.OptInt(1, 0)
|
||||||
|
seqslice := obiseq.NewBioSequenceSlice(capacity)
|
||||||
luaState.Push(obiseqslice2Lua(luaState, seqslice))
|
luaState.Push(obiseqslice2Lua(luaState, seqslice))
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -104,11 +104,11 @@ func SeqToSliceWorker(worker SeqWorker,
|
|||||||
for _, s := range input {
|
for _, s := range input {
|
||||||
r, err := worker(s)
|
r, err := worker(s)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if i+len(r) > cap(output) {
|
||||||
|
output = slices.Grow(output[:i], len(r))
|
||||||
|
output = output[:cap(output)]
|
||||||
|
}
|
||||||
for _, rs := range r {
|
for _, rs := range r {
|
||||||
if i == len(output) {
|
|
||||||
output = slices.Grow(output, cap(output))
|
|
||||||
output = output[:cap(output)]
|
|
||||||
}
|
|
||||||
output[i] = rs
|
output[i] = rs
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,8 +46,7 @@ func CLIDistributeSequence(sequences obiiter.IBioSequence) {
|
|||||||
formater = obiformats.WriteSequencesToFile
|
formater = obiformats.WriteSequencesToFile
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatcher := sequences.Distribute(CLISequenceClassifier(),
|
dispatcher := sequences.Distribute(CLISequenceClassifier())
|
||||||
obidefault.BatchSize())
|
|
||||||
|
|
||||||
obiformats.WriterDispatcher(CLIFileNamePattern(),
|
obiformats.WriterDispatcher(CLIFileNamePattern(),
|
||||||
dispatcher, formater, opts...,
|
dispatcher, formater, opts...,
|
||||||
|
|||||||
Reference in New Issue
Block a user