mirror of
https://github.com/metabarcoding/obitools4.git
synced 2026-03-25 05:20:52 +00:00
Merge pull request #93 from metabarcoding/push-snmwxkwkqxrm
Memory-aware Batching and Static Linux Builds
This commit is contained in:
13
.github/workflows/release.yml
vendored
13
.github/workflows/release.yml
vendored
@@ -62,6 +62,12 @@ jobs:
|
||||
TAG=${GITHUB_REF#refs/tags/Release_}
|
||||
echo "version=$TAG" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Install build tools (Linux)
|
||||
if: runner.os == 'Linux'
|
||||
run: |
|
||||
sudo apt-get update -q
|
||||
sudo apt-get install -y musl-tools
|
||||
|
||||
- name: Install build tools (macOS)
|
||||
if: runner.os == 'macOS'
|
||||
run: |
|
||||
@@ -74,8 +80,13 @@ jobs:
|
||||
GOOS: ${{ matrix.goos }}
|
||||
GOARCH: ${{ matrix.goarch }}
|
||||
VERSION: ${{ steps.get_version.outputs.version }}
|
||||
CC: ${{ matrix.goos == 'linux' && 'musl-gcc' || '' }}
|
||||
run: |
|
||||
make obitools
|
||||
if [ "$GOOS" = "linux" ]; then
|
||||
make LDFLAGS='-linkmode=external -extldflags=-static' obitools
|
||||
else
|
||||
make obitools
|
||||
fi
|
||||
mkdir -p artifacts
|
||||
# Create a single tar.gz with all binaries for this platform
|
||||
tar -czf artifacts/obitools4_${VERSION}_${{ matrix.output_name }}.tar.gz -C build .
|
||||
|
||||
3
Makefile
3
Makefile
@@ -10,8 +10,9 @@ BLUE := \033[0;34m
|
||||
NC := \033[0m
|
||||
|
||||
GOFLAGS=
|
||||
LDFLAGS=
|
||||
GOCMD=go
|
||||
GOBUILD=$(GOCMD) build $(GOFLAGS)
|
||||
GOBUILD=$(GOCMD) build $(GOFLAGS) $(if $(LDFLAGS),-ldflags='$(LDFLAGS)')
|
||||
GOGENERATE=$(GOCMD) generate
|
||||
GOCLEAN=$(GOCMD) clean
|
||||
GOTEST=$(GOCMD) test
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
package obidefault
|
||||
|
||||
var _BatchSize = 2000
|
||||
// _BatchSize is the minimum number of sequences per batch (floor).
|
||||
// Used as the minSeqs argument to RebatchBySize.
|
||||
var _BatchSize = 1
|
||||
|
||||
// _BatchSizeMax is the maximum number of sequences per batch (ceiling).
|
||||
// A batch is flushed when this count is reached regardless of memory usage.
|
||||
var _BatchSizeMax = 2000
|
||||
|
||||
// SetBatchSize sets the size of the sequence batches.
|
||||
//
|
||||
@@ -24,3 +30,42 @@ func BatchSize() int {
|
||||
func BatchSizePtr() *int {
|
||||
return &_BatchSize
|
||||
}
|
||||
|
||||
// BatchSizeMax returns the maximum number of sequences per batch.
|
||||
func BatchSizeMax() int {
|
||||
return _BatchSizeMax
|
||||
}
|
||||
|
||||
func BatchSizeMaxPtr() *int {
|
||||
return &_BatchSizeMax
|
||||
}
|
||||
|
||||
// _BatchMem holds the maximum cumulative memory (in bytes) per batch when
|
||||
// memory-based batching is requested. A value of 0 disables memory-based
|
||||
// batching and falls back to count-based batching.
|
||||
var _BatchMem = 128 * 1024 * 1024 // 128 MB default; set to 0 to disable
|
||||
var _BatchMemStr = ""
|
||||
|
||||
// SetBatchMem sets the memory budget per batch in bytes.
|
||||
func SetBatchMem(n int) {
|
||||
_BatchMem = n
|
||||
}
|
||||
|
||||
// BatchMem returns the current memory budget per batch in bytes.
|
||||
// A value of 0 means memory-based batching is disabled.
|
||||
func BatchMem() int {
|
||||
return _BatchMem
|
||||
}
|
||||
|
||||
func BatchMemPtr() *int {
|
||||
return &_BatchMem
|
||||
}
|
||||
|
||||
// BatchMemStr returns the raw --batch-mem string value as provided on the CLI.
|
||||
func BatchMemStr() string {
|
||||
return _BatchMemStr
|
||||
}
|
||||
|
||||
func BatchMemStrPtr() *string {
|
||||
return &_BatchMemStr
|
||||
}
|
||||
|
||||
@@ -444,6 +444,67 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence {
|
||||
return newIter
|
||||
}
|
||||
|
||||
// RebatchBySize reorganises the stream into batches bounded by two independent
|
||||
// upper limits: maxCount (max number of sequences) and maxBytes (max cumulative
|
||||
// estimated memory). A batch is flushed as soon as either limit would be
|
||||
// exceeded. A single sequence larger than maxBytes is always emitted alone.
|
||||
// Passing 0 for a limit disables that constraint; if both are 0 it falls back
|
||||
// to Rebatch(obidefault.BatchSizeMax()).
|
||||
func (iterator IBioSequence) RebatchBySize(maxBytes int, maxCount int) IBioSequence {
|
||||
if maxBytes <= 0 && maxCount <= 0 {
|
||||
return iterator.Rebatch(obidefault.BatchSizeMax())
|
||||
}
|
||||
|
||||
newIter := MakeIBioSequence()
|
||||
|
||||
newIter.Add(1)
|
||||
|
||||
go func() {
|
||||
newIter.WaitAndClose()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
order := 0
|
||||
iterator = iterator.SortBatches()
|
||||
buffer := obiseq.MakeBioSequenceSlice()
|
||||
bufBytes := 0
|
||||
source := ""
|
||||
|
||||
flush := func() {
|
||||
if len(buffer) > 0 {
|
||||
newIter.Push(MakeBioSequenceBatch(source, order, buffer))
|
||||
order++
|
||||
buffer = obiseq.MakeBioSequenceSlice()
|
||||
bufBytes = 0
|
||||
}
|
||||
}
|
||||
|
||||
for iterator.Next() {
|
||||
seqs := iterator.Get()
|
||||
source = seqs.Source()
|
||||
for _, s := range seqs.Slice() {
|
||||
sz := s.MemorySize()
|
||||
countFull := maxCount > 0 && len(buffer) >= maxCount
|
||||
memFull := maxBytes > 0 && bufBytes+sz > maxBytes && len(buffer) > 0
|
||||
if countFull || memFull {
|
||||
flush()
|
||||
}
|
||||
buffer = append(buffer, s)
|
||||
bufBytes += sz
|
||||
}
|
||||
}
|
||||
flush()
|
||||
|
||||
newIter.Done()
|
||||
}()
|
||||
|
||||
if iterator.IsPaired() {
|
||||
newIter.MarkAsPaired()
|
||||
}
|
||||
|
||||
return newIter
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) FilterEmpty() IBioSequence {
|
||||
|
||||
newIter := MakeIBioSequence()
|
||||
@@ -638,7 +699,7 @@ func (iterator IBioSequence) FilterOn(predicate obiseq.SequencePredicate,
|
||||
trueIter.MarkAsPaired()
|
||||
}
|
||||
|
||||
return trueIter.Rebatch(size)
|
||||
return trueIter.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax())
|
||||
}
|
||||
|
||||
func (iterator IBioSequence) FilterAnd(predicate obiseq.SequencePredicate,
|
||||
@@ -694,7 +755,7 @@ func (iterator IBioSequence) FilterAnd(predicate obiseq.SequencePredicate,
|
||||
trueIter.MarkAsPaired()
|
||||
}
|
||||
|
||||
return trueIter.Rebatch(size)
|
||||
return trueIter.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax())
|
||||
}
|
||||
|
||||
// Load all sequences availables from an IBioSequenceBatch iterator into
|
||||
|
||||
@@ -3,6 +3,7 @@ package obiiter
|
||||
import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
|
||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
||||
)
|
||||
|
||||
@@ -70,7 +71,7 @@ func IFragments(minsize, length, overlap, size, nworkers int) Pipeable {
|
||||
}
|
||||
go f(iterator)
|
||||
|
||||
return newiter.SortBatches().Rebatch(size)
|
||||
return newiter.SortBatches().RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax())
|
||||
}
|
||||
|
||||
return ifrg
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
|
||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
|
||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/DavidGamba/go-getoptions"
|
||||
@@ -55,7 +56,15 @@ func RegisterGlobalOptions(options *getoptions.GetOpt) {
|
||||
|
||||
options.IntVar(obidefault.BatchSizePtr(), "batch-size", obidefault.BatchSize(),
|
||||
options.GetEnv("OBIBATCHSIZE"),
|
||||
options.Description("Number of sequence per batch for paralelle processing"))
|
||||
options.Description("Minimum number of sequences per batch (floor, default 1)"))
|
||||
|
||||
options.IntVar(obidefault.BatchSizeMaxPtr(), "batch-size-max", obidefault.BatchSizeMax(),
|
||||
options.GetEnv("OBIBATCHSIZEMAX"),
|
||||
options.Description("Maximum number of sequences per batch (ceiling, default 2000)"))
|
||||
|
||||
options.StringVar(obidefault.BatchMemStrPtr(), "batch-mem", "",
|
||||
options.GetEnv("OBIBATCHMEM"),
|
||||
options.Description("Maximum memory per batch (e.g. 128K, 64M, 1G; default: 128M). Set to 0 to disable."))
|
||||
|
||||
options.Bool("solexa", false,
|
||||
options.GetEnv("OBISOLEXA"),
|
||||
@@ -157,6 +166,15 @@ func ProcessParsedOptions(options *getoptions.GetOpt, parseErr error) {
|
||||
if options.Called("solexa") {
|
||||
obidefault.SetReadQualitiesShift(64)
|
||||
}
|
||||
|
||||
if options.Called("batch-mem") {
|
||||
n, err := obiutils.ParseMemSize(obidefault.BatchMemStr())
|
||||
if err != nil {
|
||||
log.Fatalf("Invalid --batch-mem value %q: %v", obidefault.BatchMemStr(), err)
|
||||
}
|
||||
obidefault.SetBatchMem(n)
|
||||
log.Printf("Memory-based batching enabled: %s per batch", obidefault.BatchMemStr())
|
||||
}
|
||||
}
|
||||
|
||||
func GenerateOptionParser(program string,
|
||||
|
||||
@@ -3,7 +3,7 @@ package obioptions
|
||||
// Version is automatically updated by the Makefile from version.txt
|
||||
// The patch number (third digit) is incremented on each push to the repository
|
||||
|
||||
var _Version = "Release 4.4.21"
|
||||
var _Version = "Release 4.4.22"
|
||||
|
||||
// Version returns the version of the obitools package.
|
||||
//
|
||||
|
||||
@@ -273,6 +273,28 @@ func (s *BioSequence) Len() int {
|
||||
return len(s.sequence)
|
||||
}
|
||||
|
||||
// MemorySize returns an estimate of the memory footprint of the BioSequence
|
||||
// in bytes. It accounts for the sequence, quality scores, feature data,
|
||||
// annotations, and fixed struct overhead. The estimate is conservative
|
||||
// (cap rather than len for byte slices) so it is suitable for memory-based
|
||||
// batching decisions.
|
||||
func (s *BioSequence) MemorySize() int {
|
||||
if s == nil {
|
||||
return 0
|
||||
}
|
||||
// fixed struct overhead (strings, pointers, mutex pointer)
|
||||
const overhead = 128
|
||||
n := overhead
|
||||
n += cap(s.sequence)
|
||||
n += cap(s.qualities)
|
||||
n += cap(s.feature)
|
||||
n += len(s.id)
|
||||
n += len(s.source)
|
||||
// rough annotation estimate: each key+value pair ~64 bytes on average
|
||||
n += len(s.annotations) * 64
|
||||
return n
|
||||
}
|
||||
|
||||
// HasQualities checks if the BioSequence has sequence qualitiy scores.
|
||||
//
|
||||
// This function does not have any parameters.
|
||||
|
||||
@@ -1,13 +1,20 @@
|
||||
package obiseq
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
|
||||
)
|
||||
|
||||
const _LargeSliceThreshold = 100 * 1024 // 100 kb — below: leave to GC, above: trigger explicit GC
|
||||
const _GCBytesBudget = int64(256 * 1024 * 1024) // trigger GC every 256 MB of large discards
|
||||
|
||||
var _largeSliceDiscardedBytes = atomic.Int64{}
|
||||
|
||||
var _BioSequenceByteSlicePool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
bs := make([]byte, 0, 300)
|
||||
@@ -34,6 +41,13 @@ func RecycleSlice(s *[]byte) {
|
||||
}
|
||||
if cap(*s) <= 1024 {
|
||||
_BioSequenceByteSlicePool.Put(s)
|
||||
} else if cap(*s) >= _LargeSliceThreshold {
|
||||
n := int64(cap(*s))
|
||||
*s = nil
|
||||
prev := _largeSliceDiscardedBytes.Load()
|
||||
if _largeSliceDiscardedBytes.Add(n)/_GCBytesBudget > prev/_GCBytesBudget {
|
||||
runtime.GC()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,6 +214,8 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) {
|
||||
|
||||
iterator = iterator.Speed("Reading sequences")
|
||||
|
||||
iterator = iterator.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax())
|
||||
|
||||
return iterator, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -291,5 +291,5 @@ func IndexReferenceDB(iterator obiiter.IBioSequence) obiiter.IBioSequence {
|
||||
go f()
|
||||
}
|
||||
|
||||
return indexed.Rebatch(obidefault.BatchSize())
|
||||
return indexed.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax())
|
||||
}
|
||||
|
||||
85
pkg/obiutils/memsize.go
Normal file
85
pkg/obiutils/memsize.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package obiutils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"unicode"
|
||||
)
|
||||
|
||||
// ParseMemSize parses a human-readable memory size string and returns the
|
||||
// equivalent number of bytes. The value is a number optionally followed by a
|
||||
// unit suffix (case-insensitive):
|
||||
//
|
||||
// B or (no suffix) — bytes
|
||||
// K or KB — kibibytes (1 024)
|
||||
// M or MB — mebibytes (1 048 576)
|
||||
// G or GB — gibibytes (1 073 741 824)
|
||||
// T or TB — tebibytes (1 099 511 627 776)
|
||||
//
|
||||
// Examples: "512", "128K", "128k", "64M", "1G", "2GB"
|
||||
func ParseMemSize(s string) (int, error) {
|
||||
s = strings.TrimSpace(s)
|
||||
if s == "" {
|
||||
return 0, fmt.Errorf("empty memory size string")
|
||||
}
|
||||
|
||||
// split numeric prefix from unit suffix
|
||||
i := 0
|
||||
for i < len(s) && (unicode.IsDigit(rune(s[i])) || s[i] == '.') {
|
||||
i++
|
||||
}
|
||||
numStr := s[:i]
|
||||
unit := strings.ToUpper(strings.TrimSpace(s[i:]))
|
||||
// strip trailing 'B' from two-letter units (KB→K, MB→M …)
|
||||
if len(unit) == 2 && unit[1] == 'B' {
|
||||
unit = unit[:1]
|
||||
}
|
||||
|
||||
val, err := strconv.ParseFloat(numStr, 64)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid memory size %q: %w", s, err)
|
||||
}
|
||||
|
||||
var multiplier float64
|
||||
switch unit {
|
||||
case "", "B":
|
||||
multiplier = 1
|
||||
case "K":
|
||||
multiplier = 1024
|
||||
case "M":
|
||||
multiplier = 1024 * 1024
|
||||
case "G":
|
||||
multiplier = 1024 * 1024 * 1024
|
||||
case "T":
|
||||
multiplier = 1024 * 1024 * 1024 * 1024
|
||||
default:
|
||||
return 0, fmt.Errorf("unknown memory unit %q in %q", unit, s)
|
||||
}
|
||||
|
||||
return int(val * multiplier), nil
|
||||
}
|
||||
|
||||
// FormatMemSize formats a byte count as a human-readable string with the
|
||||
// largest unit that produces a value ≥ 1 (e.g. 1536 → "1.5K").
|
||||
func FormatMemSize(n int) string {
|
||||
units := []struct {
|
||||
suffix string
|
||||
size int
|
||||
}{
|
||||
{"T", 1024 * 1024 * 1024 * 1024},
|
||||
{"G", 1024 * 1024 * 1024},
|
||||
{"M", 1024 * 1024},
|
||||
{"K", 1024},
|
||||
}
|
||||
for _, u := range units {
|
||||
if n >= u.size {
|
||||
v := float64(n) / float64(u.size)
|
||||
if v == float64(int(v)) {
|
||||
return fmt.Sprintf("%d%s", int(v), u.suffix)
|
||||
}
|
||||
return fmt.Sprintf("%.1f%s", v, u.suffix)
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("%dB", n)
|
||||
}
|
||||
@@ -1 +1 @@
|
||||
4.4.21
|
||||
4.4.22
|
||||
|
||||
Reference in New Issue
Block a user