diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5e419df..ac1a0f8 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -62,6 +62,12 @@ jobs: TAG=${GITHUB_REF#refs/tags/Release_} echo "version=$TAG" >> $GITHUB_OUTPUT + - name: Install build tools (Linux) + if: runner.os == 'Linux' + run: | + sudo apt-get update -q + sudo apt-get install -y musl-tools + - name: Install build tools (macOS) if: runner.os == 'macOS' run: | @@ -74,8 +80,13 @@ jobs: GOOS: ${{ matrix.goos }} GOARCH: ${{ matrix.goarch }} VERSION: ${{ steps.get_version.outputs.version }} + CC: ${{ matrix.goos == 'linux' && 'musl-gcc' || '' }} run: | - make obitools + if [ "$GOOS" = "linux" ]; then + make LDFLAGS='-linkmode=external -extldflags=-static' obitools + else + make obitools + fi mkdir -p artifacts # Create a single tar.gz with all binaries for this platform tar -czf artifacts/obitools4_${VERSION}_${{ matrix.output_name }}.tar.gz -C build . diff --git a/Makefile b/Makefile index 1801300..8cd2a0c 100644 --- a/Makefile +++ b/Makefile @@ -10,8 +10,9 @@ BLUE := \033[0;34m NC := \033[0m GOFLAGS= +LDFLAGS= GOCMD=go -GOBUILD=$(GOCMD) build $(GOFLAGS) +GOBUILD=$(GOCMD) build $(GOFLAGS) $(if $(LDFLAGS),-ldflags='$(LDFLAGS)') GOGENERATE=$(GOCMD) generate GOCLEAN=$(GOCMD) clean GOTEST=$(GOCMD) test diff --git a/pkg/obidefault/batch.go b/pkg/obidefault/batch.go index 5a128fa..83e9d3a 100644 --- a/pkg/obidefault/batch.go +++ b/pkg/obidefault/batch.go @@ -1,6 +1,12 @@ package obidefault -var _BatchSize = 2000 +// _BatchSize is the minimum number of sequences per batch (floor). +// Used as the minSeqs argument to RebatchBySize. +var _BatchSize = 1 + +// _BatchSizeMax is the maximum number of sequences per batch (ceiling). +// A batch is flushed when this count is reached regardless of memory usage. +var _BatchSizeMax = 2000 // SetBatchSize sets the size of the sequence batches. // @@ -24,3 +30,42 @@ func BatchSize() int { func BatchSizePtr() *int { return &_BatchSize } + +// BatchSizeMax returns the maximum number of sequences per batch. +func BatchSizeMax() int { + return _BatchSizeMax +} + +func BatchSizeMaxPtr() *int { + return &_BatchSizeMax +} + +// _BatchMem holds the maximum cumulative memory (in bytes) per batch when +// memory-based batching is requested. A value of 0 disables memory-based +// batching and falls back to count-based batching. +var _BatchMem = 128 * 1024 * 1024 // 128 MB default; set to 0 to disable +var _BatchMemStr = "" + +// SetBatchMem sets the memory budget per batch in bytes. +func SetBatchMem(n int) { + _BatchMem = n +} + +// BatchMem returns the current memory budget per batch in bytes. +// A value of 0 means memory-based batching is disabled. +func BatchMem() int { + return _BatchMem +} + +func BatchMemPtr() *int { + return &_BatchMem +} + +// BatchMemStr returns the raw --batch-mem string value as provided on the CLI. +func BatchMemStr() string { + return _BatchMemStr +} + +func BatchMemStrPtr() *string { + return &_BatchMemStr +} diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index 76b6ab5..e7d51a1 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -444,6 +444,67 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { return newIter } +// RebatchBySize reorganises the stream into batches bounded by two independent +// upper limits: maxCount (max number of sequences) and maxBytes (max cumulative +// estimated memory). A batch is flushed as soon as either limit would be +// exceeded. A single sequence larger than maxBytes is always emitted alone. +// Passing 0 for a limit disables that constraint; if both are 0 it falls back +// to Rebatch(obidefault.BatchSizeMax()). +func (iterator IBioSequence) RebatchBySize(maxBytes int, maxCount int) IBioSequence { + if maxBytes <= 0 && maxCount <= 0 { + return iterator.Rebatch(obidefault.BatchSizeMax()) + } + + newIter := MakeIBioSequence() + + newIter.Add(1) + + go func() { + newIter.WaitAndClose() + }() + + go func() { + order := 0 + iterator = iterator.SortBatches() + buffer := obiseq.MakeBioSequenceSlice() + bufBytes := 0 + source := "" + + flush := func() { + if len(buffer) > 0 { + newIter.Push(MakeBioSequenceBatch(source, order, buffer)) + order++ + buffer = obiseq.MakeBioSequenceSlice() + bufBytes = 0 + } + } + + for iterator.Next() { + seqs := iterator.Get() + source = seqs.Source() + for _, s := range seqs.Slice() { + sz := s.MemorySize() + countFull := maxCount > 0 && len(buffer) >= maxCount + memFull := maxBytes > 0 && bufBytes+sz > maxBytes && len(buffer) > 0 + if countFull || memFull { + flush() + } + buffer = append(buffer, s) + bufBytes += sz + } + } + flush() + + newIter.Done() + }() + + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + + return newIter +} + func (iterator IBioSequence) FilterEmpty() IBioSequence { newIter := MakeIBioSequence() @@ -638,7 +699,7 @@ func (iterator IBioSequence) FilterOn(predicate obiseq.SequencePredicate, trueIter.MarkAsPaired() } - return trueIter.Rebatch(size) + return trueIter.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax()) } func (iterator IBioSequence) FilterAnd(predicate obiseq.SequencePredicate, @@ -694,7 +755,7 @@ func (iterator IBioSequence) FilterAnd(predicate obiseq.SequencePredicate, trueIter.MarkAsPaired() } - return trueIter.Rebatch(size) + return trueIter.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax()) } // Load all sequences availables from an IBioSequenceBatch iterator into diff --git a/pkg/obiiter/fragment.go b/pkg/obiiter/fragment.go index 7e2fd1b..f1b0703 100644 --- a/pkg/obiiter/fragment.go +++ b/pkg/obiiter/fragment.go @@ -3,6 +3,7 @@ package obiiter import ( log "github.com/sirupsen/logrus" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" ) @@ -70,7 +71,7 @@ func IFragments(minsize, length, overlap, size, nworkers int) Pipeable { } go f(iterator) - return newiter.SortBatches().Rebatch(size) + return newiter.SortBatches().RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax()) } return ifrg diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index 5109ac3..5ebb471 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -8,6 +8,7 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" log "github.com/sirupsen/logrus" "github.com/DavidGamba/go-getoptions" @@ -55,7 +56,15 @@ func RegisterGlobalOptions(options *getoptions.GetOpt) { options.IntVar(obidefault.BatchSizePtr(), "batch-size", obidefault.BatchSize(), options.GetEnv("OBIBATCHSIZE"), - options.Description("Number of sequence per batch for paralelle processing")) + options.Description("Minimum number of sequences per batch (floor, default 1)")) + + options.IntVar(obidefault.BatchSizeMaxPtr(), "batch-size-max", obidefault.BatchSizeMax(), + options.GetEnv("OBIBATCHSIZEMAX"), + options.Description("Maximum number of sequences per batch (ceiling, default 2000)")) + + options.StringVar(obidefault.BatchMemStrPtr(), "batch-mem", "", + options.GetEnv("OBIBATCHMEM"), + options.Description("Maximum memory per batch (e.g. 128K, 64M, 1G; default: 128M). Set to 0 to disable.")) options.Bool("solexa", false, options.GetEnv("OBISOLEXA"), @@ -157,6 +166,15 @@ func ProcessParsedOptions(options *getoptions.GetOpt, parseErr error) { if options.Called("solexa") { obidefault.SetReadQualitiesShift(64) } + + if options.Called("batch-mem") { + n, err := obiutils.ParseMemSize(obidefault.BatchMemStr()) + if err != nil { + log.Fatalf("Invalid --batch-mem value %q: %v", obidefault.BatchMemStr(), err) + } + obidefault.SetBatchMem(n) + log.Printf("Memory-based batching enabled: %s per batch", obidefault.BatchMemStr()) + } } func GenerateOptionParser(program string, diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index 7f851b0..89279ab 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -3,7 +3,7 @@ package obioptions // Version is automatically updated by the Makefile from version.txt // The patch number (third digit) is incremented on each push to the repository -var _Version = "Release 4.4.21" +var _Version = "Release 4.4.22" // Version returns the version of the obitools package. // diff --git a/pkg/obiseq/biosequence.go b/pkg/obiseq/biosequence.go index a362a34..b136bb2 100644 --- a/pkg/obiseq/biosequence.go +++ b/pkg/obiseq/biosequence.go @@ -273,6 +273,28 @@ func (s *BioSequence) Len() int { return len(s.sequence) } +// MemorySize returns an estimate of the memory footprint of the BioSequence +// in bytes. It accounts for the sequence, quality scores, feature data, +// annotations, and fixed struct overhead. The estimate is conservative +// (cap rather than len for byte slices) so it is suitable for memory-based +// batching decisions. +func (s *BioSequence) MemorySize() int { + if s == nil { + return 0 + } + // fixed struct overhead (strings, pointers, mutex pointer) + const overhead = 128 + n := overhead + n += cap(s.sequence) + n += cap(s.qualities) + n += cap(s.feature) + n += len(s.id) + n += len(s.source) + // rough annotation estimate: each key+value pair ~64 bytes on average + n += len(s.annotations) * 64 + return n +} + // HasQualities checks if the BioSequence has sequence qualitiy scores. // // This function does not have any parameters. diff --git a/pkg/obiseq/pool.go b/pkg/obiseq/pool.go index efe0bda..7c5be1f 100644 --- a/pkg/obiseq/pool.go +++ b/pkg/obiseq/pool.go @@ -1,13 +1,20 @@ package obiseq import ( + "runtime" "sync" + "sync/atomic" log "github.com/sirupsen/logrus" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) +const _LargeSliceThreshold = 100 * 1024 // 100 kb — below: leave to GC, above: trigger explicit GC +const _GCBytesBudget = int64(256 * 1024 * 1024) // trigger GC every 256 MB of large discards + +var _largeSliceDiscardedBytes = atomic.Int64{} + var _BioSequenceByteSlicePool = sync.Pool{ New: func() interface{} { bs := make([]byte, 0, 300) @@ -34,6 +41,13 @@ func RecycleSlice(s *[]byte) { } if cap(*s) <= 1024 { _BioSequenceByteSlicePool.Put(s) + } else if cap(*s) >= _LargeSliceThreshold { + n := int64(cap(*s)) + *s = nil + prev := _largeSliceDiscardedBytes.Load() + if _largeSliceDiscardedBytes.Add(n)/_GCBytesBudget > prev/_GCBytesBudget { + runtime.GC() + } } } } diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index d05c881..cbda802 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -214,6 +214,8 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { iterator = iterator.Speed("Reading sequences") + iterator = iterator.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax()) + return iterator, nil } diff --git a/pkg/obitools/obirefidx/obirefidx.go b/pkg/obitools/obirefidx/obirefidx.go index fa29d29..a146fc6 100644 --- a/pkg/obitools/obirefidx/obirefidx.go +++ b/pkg/obitools/obirefidx/obirefidx.go @@ -291,5 +291,5 @@ func IndexReferenceDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { go f() } - return indexed.Rebatch(obidefault.BatchSize()) + return indexed.RebatchBySize(obidefault.BatchMem(), obidefault.BatchSizeMax()) } diff --git a/pkg/obiutils/memsize.go b/pkg/obiutils/memsize.go new file mode 100644 index 0000000..b2f78d1 --- /dev/null +++ b/pkg/obiutils/memsize.go @@ -0,0 +1,85 @@ +package obiutils + +import ( + "fmt" + "strconv" + "strings" + "unicode" +) + +// ParseMemSize parses a human-readable memory size string and returns the +// equivalent number of bytes. The value is a number optionally followed by a +// unit suffix (case-insensitive): +// +// B or (no suffix) — bytes +// K or KB — kibibytes (1 024) +// M or MB — mebibytes (1 048 576) +// G or GB — gibibytes (1 073 741 824) +// T or TB — tebibytes (1 099 511 627 776) +// +// Examples: "512", "128K", "128k", "64M", "1G", "2GB" +func ParseMemSize(s string) (int, error) { + s = strings.TrimSpace(s) + if s == "" { + return 0, fmt.Errorf("empty memory size string") + } + + // split numeric prefix from unit suffix + i := 0 + for i < len(s) && (unicode.IsDigit(rune(s[i])) || s[i] == '.') { + i++ + } + numStr := s[:i] + unit := strings.ToUpper(strings.TrimSpace(s[i:])) + // strip trailing 'B' from two-letter units (KB→K, MB→M …) + if len(unit) == 2 && unit[1] == 'B' { + unit = unit[:1] + } + + val, err := strconv.ParseFloat(numStr, 64) + if err != nil { + return 0, fmt.Errorf("invalid memory size %q: %w", s, err) + } + + var multiplier float64 + switch unit { + case "", "B": + multiplier = 1 + case "K": + multiplier = 1024 + case "M": + multiplier = 1024 * 1024 + case "G": + multiplier = 1024 * 1024 * 1024 + case "T": + multiplier = 1024 * 1024 * 1024 * 1024 + default: + return 0, fmt.Errorf("unknown memory unit %q in %q", unit, s) + } + + return int(val * multiplier), nil +} + +// FormatMemSize formats a byte count as a human-readable string with the +// largest unit that produces a value ≥ 1 (e.g. 1536 → "1.5K"). +func FormatMemSize(n int) string { + units := []struct { + suffix string + size int + }{ + {"T", 1024 * 1024 * 1024 * 1024}, + {"G", 1024 * 1024 * 1024}, + {"M", 1024 * 1024}, + {"K", 1024}, + } + for _, u := range units { + if n >= u.size { + v := float64(n) / float64(u.size) + if v == float64(int(v)) { + return fmt.Sprintf("%d%s", int(v), u.suffix) + } + return fmt.Sprintf("%.1f%s", v, u.suffix) + } + } + return fmt.Sprintf("%dB", n) +} diff --git a/version.txt b/version.txt index 711adc7..9ed60f8 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -4.4.21 +4.4.22