mirror of
https://github.com/metabarcoding/obitools4.git
synced 2026-03-25 13:30:52 +00:00
Add memory-based batching support
Implement memory-aware batch sizing with --batch-mem CLI option, enabling adaptive batching based on estimated sequence memory footprint. Key changes: - Added _BatchMem and related getters/setters in pkg/obidefault - Implemented RebatchBySize() in pkg/obiter for memory-constrained batching - Added BioSequence.MemorySize() for conservative memory estimation - Integrated batch-mem option in pkg/obioptions with human-readable size parsing (e.g., 128K, 64M, 1G) - Added obiutils.ParseMemSize/FormatMemSize for unit conversion - Enhanced pool GC in pkg/obiseq/pool.go to trigger explicit GC for large slice discards - Updated sequence_reader.go to apply memory-based rebatching when enabled
This commit is contained in:
@@ -24,3 +24,33 @@ func BatchSize() int {
|
|||||||
func BatchSizePtr() *int {
|
func BatchSizePtr() *int {
|
||||||
return &_BatchSize
|
return &_BatchSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// _BatchMem holds the maximum cumulative memory (in bytes) per batch when
|
||||||
|
// memory-based batching is requested. A value of 0 disables memory-based
|
||||||
|
// batching and falls back to count-based batching.
|
||||||
|
var _BatchMem = 0
|
||||||
|
var _BatchMemStr = ""
|
||||||
|
|
||||||
|
// SetBatchMem sets the memory budget per batch in bytes.
|
||||||
|
func SetBatchMem(n int) {
|
||||||
|
_BatchMem = n
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchMem returns the current memory budget per batch in bytes.
|
||||||
|
// A value of 0 means memory-based batching is disabled.
|
||||||
|
func BatchMem() int {
|
||||||
|
return _BatchMem
|
||||||
|
}
|
||||||
|
|
||||||
|
func BatchMemPtr() *int {
|
||||||
|
return &_BatchMem
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchMemStr returns the raw --batch-mem string value as provided on the CLI.
|
||||||
|
func BatchMemStr() string {
|
||||||
|
return _BatchMemStr
|
||||||
|
}
|
||||||
|
|
||||||
|
func BatchMemStrPtr() *string {
|
||||||
|
return &_BatchMemStr
|
||||||
|
}
|
||||||
|
|||||||
@@ -444,6 +444,62 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence {
|
|||||||
return newIter
|
return newIter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RebatchBySize reorganises the stream into batches whose cumulative estimated
|
||||||
|
// memory footprint does not exceed maxBytes. A single sequence larger than
|
||||||
|
// maxBytes is emitted alone rather than dropped. minSeqs sets a lower bound on
|
||||||
|
// batch size (in number of sequences) so that very large sequences still form
|
||||||
|
// reasonably-sized work units; use 1 to disable.
|
||||||
|
func (iterator IBioSequence) RebatchBySize(maxBytes int, minSeqs int) IBioSequence {
|
||||||
|
newIter := MakeIBioSequence()
|
||||||
|
|
||||||
|
newIter.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
newIter.WaitAndClose()
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
order := 0
|
||||||
|
iterator = iterator.SortBatches()
|
||||||
|
buffer := obiseq.MakeBioSequenceSlice()
|
||||||
|
bufBytes := 0
|
||||||
|
source := ""
|
||||||
|
|
||||||
|
flush := func() {
|
||||||
|
if len(buffer) > 0 {
|
||||||
|
newIter.Push(MakeBioSequenceBatch(source, order, buffer))
|
||||||
|
order++
|
||||||
|
buffer = obiseq.MakeBioSequenceSlice()
|
||||||
|
bufBytes = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for iterator.Next() {
|
||||||
|
seqs := iterator.Get()
|
||||||
|
source = seqs.Source()
|
||||||
|
for _, s := range seqs.Slice() {
|
||||||
|
sz := s.MemorySize()
|
||||||
|
// flush before adding if it would overflow, but only if
|
||||||
|
// we already meet the minimum sequence count
|
||||||
|
if bufBytes+sz > maxBytes && len(buffer) >= minSeqs {
|
||||||
|
flush()
|
||||||
|
}
|
||||||
|
buffer = append(buffer, s)
|
||||||
|
bufBytes += sz
|
||||||
|
}
|
||||||
|
}
|
||||||
|
flush()
|
||||||
|
|
||||||
|
newIter.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
if iterator.IsPaired() {
|
||||||
|
newIter.MarkAsPaired()
|
||||||
|
}
|
||||||
|
|
||||||
|
return newIter
|
||||||
|
}
|
||||||
|
|
||||||
func (iterator IBioSequence) FilterEmpty() IBioSequence {
|
func (iterator IBioSequence) FilterEmpty() IBioSequence {
|
||||||
|
|
||||||
newIter := MakeIBioSequence()
|
newIter := MakeIBioSequence()
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
|
||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
|
||||||
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/DavidGamba/go-getoptions"
|
"github.com/DavidGamba/go-getoptions"
|
||||||
@@ -57,6 +58,10 @@ func RegisterGlobalOptions(options *getoptions.GetOpt) {
|
|||||||
options.GetEnv("OBIBATCHSIZE"),
|
options.GetEnv("OBIBATCHSIZE"),
|
||||||
options.Description("Number of sequence per batch for paralelle processing"))
|
options.Description("Number of sequence per batch for paralelle processing"))
|
||||||
|
|
||||||
|
options.StringVar(obidefault.BatchMemStrPtr(), "batch-mem", "",
|
||||||
|
options.GetEnv("OBIBATCHMEM"),
|
||||||
|
options.Description("Maximum memory per batch (e.g. 128K, 64M, 1G). Overrides --batch-size when set."))
|
||||||
|
|
||||||
options.Bool("solexa", false,
|
options.Bool("solexa", false,
|
||||||
options.GetEnv("OBISOLEXA"),
|
options.GetEnv("OBISOLEXA"),
|
||||||
options.Description("Decodes quality string according to the Solexa specification."))
|
options.Description("Decodes quality string according to the Solexa specification."))
|
||||||
@@ -157,6 +162,15 @@ func ProcessParsedOptions(options *getoptions.GetOpt, parseErr error) {
|
|||||||
if options.Called("solexa") {
|
if options.Called("solexa") {
|
||||||
obidefault.SetReadQualitiesShift(64)
|
obidefault.SetReadQualitiesShift(64)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if options.Called("batch-mem") {
|
||||||
|
n, err := obiutils.ParseMemSize(obidefault.BatchMemStr())
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Invalid --batch-mem value %q: %v", obidefault.BatchMemStr(), err)
|
||||||
|
}
|
||||||
|
obidefault.SetBatchMem(n)
|
||||||
|
log.Printf("Memory-based batching enabled: %s per batch", obidefault.BatchMemStr())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func GenerateOptionParser(program string,
|
func GenerateOptionParser(program string,
|
||||||
|
|||||||
@@ -273,6 +273,28 @@ func (s *BioSequence) Len() int {
|
|||||||
return len(s.sequence)
|
return len(s.sequence)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MemorySize returns an estimate of the memory footprint of the BioSequence
|
||||||
|
// in bytes. It accounts for the sequence, quality scores, feature data,
|
||||||
|
// annotations, and fixed struct overhead. The estimate is conservative
|
||||||
|
// (cap rather than len for byte slices) so it is suitable for memory-based
|
||||||
|
// batching decisions.
|
||||||
|
func (s *BioSequence) MemorySize() int {
|
||||||
|
if s == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
// fixed struct overhead (strings, pointers, mutex pointer)
|
||||||
|
const overhead = 128
|
||||||
|
n := overhead
|
||||||
|
n += cap(s.sequence)
|
||||||
|
n += cap(s.qualities)
|
||||||
|
n += cap(s.feature)
|
||||||
|
n += len(s.id)
|
||||||
|
n += len(s.source)
|
||||||
|
// rough annotation estimate: each key+value pair ~64 bytes on average
|
||||||
|
n += len(s.annotations) * 64
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
// HasQualities checks if the BioSequence has sequence qualitiy scores.
|
// HasQualities checks if the BioSequence has sequence qualitiy scores.
|
||||||
//
|
//
|
||||||
// This function does not have any parameters.
|
// This function does not have any parameters.
|
||||||
|
|||||||
@@ -1,13 +1,20 @@
|
|||||||
package obiseq
|
package obiseq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const _LargeSliceThreshold = 100 * 1024 // 100 kb — below: leave to GC, above: trigger explicit GC
|
||||||
|
const _GCBytesBudget = int64(256 * 1024 * 1024) // trigger GC every 256 MB of large discards
|
||||||
|
|
||||||
|
var _largeSliceDiscardedBytes = atomic.Int64{}
|
||||||
|
|
||||||
var _BioSequenceByteSlicePool = sync.Pool{
|
var _BioSequenceByteSlicePool = sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
bs := make([]byte, 0, 300)
|
bs := make([]byte, 0, 300)
|
||||||
@@ -34,6 +41,13 @@ func RecycleSlice(s *[]byte) {
|
|||||||
}
|
}
|
||||||
if cap(*s) <= 1024 {
|
if cap(*s) <= 1024 {
|
||||||
_BioSequenceByteSlicePool.Put(s)
|
_BioSequenceByteSlicePool.Put(s)
|
||||||
|
} else if cap(*s) >= _LargeSliceThreshold {
|
||||||
|
n := int64(cap(*s))
|
||||||
|
*s = nil
|
||||||
|
prev := _largeSliceDiscardedBytes.Load()
|
||||||
|
if _largeSliceDiscardedBytes.Add(n)/_GCBytesBudget > prev/_GCBytesBudget {
|
||||||
|
runtime.GC()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -214,6 +214,10 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) {
|
|||||||
|
|
||||||
iterator = iterator.Speed("Reading sequences")
|
iterator = iterator.Speed("Reading sequences")
|
||||||
|
|
||||||
|
if obidefault.BatchMem() > 0 {
|
||||||
|
iterator = iterator.RebatchBySize(obidefault.BatchMem(), 1)
|
||||||
|
}
|
||||||
|
|
||||||
return iterator, nil
|
return iterator, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
85
pkg/obiutils/memsize.go
Normal file
85
pkg/obiutils/memsize.go
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
package obiutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"unicode"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ParseMemSize parses a human-readable memory size string and returns the
|
||||||
|
// equivalent number of bytes. The value is a number optionally followed by a
|
||||||
|
// unit suffix (case-insensitive):
|
||||||
|
//
|
||||||
|
// B or (no suffix) — bytes
|
||||||
|
// K or KB — kibibytes (1 024)
|
||||||
|
// M or MB — mebibytes (1 048 576)
|
||||||
|
// G or GB — gibibytes (1 073 741 824)
|
||||||
|
// T or TB — tebibytes (1 099 511 627 776)
|
||||||
|
//
|
||||||
|
// Examples: "512", "128K", "128k", "64M", "1G", "2GB"
|
||||||
|
func ParseMemSize(s string) (int, error) {
|
||||||
|
s = strings.TrimSpace(s)
|
||||||
|
if s == "" {
|
||||||
|
return 0, fmt.Errorf("empty memory size string")
|
||||||
|
}
|
||||||
|
|
||||||
|
// split numeric prefix from unit suffix
|
||||||
|
i := 0
|
||||||
|
for i < len(s) && (unicode.IsDigit(rune(s[i])) || s[i] == '.') {
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
numStr := s[:i]
|
||||||
|
unit := strings.ToUpper(strings.TrimSpace(s[i:]))
|
||||||
|
// strip trailing 'B' from two-letter units (KB→K, MB→M …)
|
||||||
|
if len(unit) == 2 && unit[1] == 'B' {
|
||||||
|
unit = unit[:1]
|
||||||
|
}
|
||||||
|
|
||||||
|
val, err := strconv.ParseFloat(numStr, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("invalid memory size %q: %w", s, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var multiplier float64
|
||||||
|
switch unit {
|
||||||
|
case "", "B":
|
||||||
|
multiplier = 1
|
||||||
|
case "K":
|
||||||
|
multiplier = 1024
|
||||||
|
case "M":
|
||||||
|
multiplier = 1024 * 1024
|
||||||
|
case "G":
|
||||||
|
multiplier = 1024 * 1024 * 1024
|
||||||
|
case "T":
|
||||||
|
multiplier = 1024 * 1024 * 1024 * 1024
|
||||||
|
default:
|
||||||
|
return 0, fmt.Errorf("unknown memory unit %q in %q", unit, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return int(val * multiplier), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FormatMemSize formats a byte count as a human-readable string with the
|
||||||
|
// largest unit that produces a value ≥ 1 (e.g. 1536 → "1.5K").
|
||||||
|
func FormatMemSize(n int) string {
|
||||||
|
units := []struct {
|
||||||
|
suffix string
|
||||||
|
size int
|
||||||
|
}{
|
||||||
|
{"T", 1024 * 1024 * 1024 * 1024},
|
||||||
|
{"G", 1024 * 1024 * 1024},
|
||||||
|
{"M", 1024 * 1024},
|
||||||
|
{"K", 1024},
|
||||||
|
}
|
||||||
|
for _, u := range units {
|
||||||
|
if n >= u.size {
|
||||||
|
v := float64(n) / float64(u.size)
|
||||||
|
if v == float64(int(v)) {
|
||||||
|
return fmt.Sprintf("%d%s", int(v), u.suffix)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%.1f%s", v, u.suffix)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%dB", n)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user