Files
obitools4/pkg/obitools/obidistribute/distribute.go
T
Eric Coissac a2b26712b2 refactor: replace fixed batch size with dynamic flushing based on count and memory
Replace the old fixed batch-size mechanism in Distribute with a dynamic strategy that flushes batches when either BatchSizeMax() sequences or BatchMem() bytes are reached per key. This aligns with the RebatchBySize strategy and removes the optional sizes parameter. Also update related code: simplify Lua wrapper to accept optional capacity, and fix buffer growth logic in worker.go using slices.Grow correctly. Remove unused BatchSize() usage from obidistribute.
2026-03-16 22:06:44 +01:00

56 lines
1.7 KiB
Go

package obidistribute
import (
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
)
func CLIDistributeSequence(sequences obiiter.IBioSequence) {
opts := make([]obiformats.WithOption, 0, 10)
switch obiconvert.CLIOutputFastHeaderFormat() {
case "json":
log.Println("On output use JSON headers")
opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader))
case "obi":
log.Println("On output use OBI headers")
opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqOBIHeader))
default:
log.Println("On output use JSON headers")
opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader))
}
nworkers := obidefault.ParallelWorkers() / 4
if nworkers < 2 {
nworkers = 2
}
opts = append(opts, obiformats.OptionsParallelWorkers(nworkers),
obiformats.OptionsBatchSize(obidefault.BatchSize()),
obiformats.OptionsAppendFile(CLIAppendSequences()),
obiformats.OptionsCompressed(obidefault.CompressOutput()))
var formater obiformats.SequenceBatchWriterToFile
switch obiconvert.CLIOutputFormat() {
case "fastq":
formater = obiformats.WriteFastqToFile
case "fasta":
formater = obiformats.WriteFastaToFile
default:
formater = obiformats.WriteSequencesToFile
}
dispatcher := sequences.Distribute(CLISequenceClassifier())
obiformats.WriterDispatcher(CLIFileNamePattern(),
dispatcher, formater, opts...,
)
}