diff --git a/pkg/obidefault/batch.go b/pkg/obidefault/batch.go index 5a128fa..d2cc10b 100644 --- a/pkg/obidefault/batch.go +++ b/pkg/obidefault/batch.go @@ -24,3 +24,33 @@ func BatchSize() int { func BatchSizePtr() *int { return &_BatchSize } + +// _BatchMem holds the maximum cumulative memory (in bytes) per batch when +// memory-based batching is requested. A value of 0 disables memory-based +// batching and falls back to count-based batching. +var _BatchMem = 0 +var _BatchMemStr = "" + +// SetBatchMem sets the memory budget per batch in bytes. +func SetBatchMem(n int) { + _BatchMem = n +} + +// BatchMem returns the current memory budget per batch in bytes. +// A value of 0 means memory-based batching is disabled. +func BatchMem() int { + return _BatchMem +} + +func BatchMemPtr() *int { + return &_BatchMem +} + +// BatchMemStr returns the raw --batch-mem string value as provided on the CLI. +func BatchMemStr() string { + return _BatchMemStr +} + +func BatchMemStrPtr() *string { + return &_BatchMemStr +} diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index 76b6ab5..9e25d65 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -444,6 +444,62 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { return newIter } +// RebatchBySize reorganises the stream into batches whose cumulative estimated +// memory footprint does not exceed maxBytes. A single sequence larger than +// maxBytes is emitted alone rather than dropped. minSeqs sets a lower bound on +// batch size (in number of sequences) so that very large sequences still form +// reasonably-sized work units; use 1 to disable. +func (iterator IBioSequence) RebatchBySize(maxBytes int, minSeqs int) IBioSequence { + newIter := MakeIBioSequence() + + newIter.Add(1) + + go func() { + newIter.WaitAndClose() + }() + + go func() { + order := 0 + iterator = iterator.SortBatches() + buffer := obiseq.MakeBioSequenceSlice() + bufBytes := 0 + source := "" + + flush := func() { + if len(buffer) > 0 { + newIter.Push(MakeBioSequenceBatch(source, order, buffer)) + order++ + buffer = obiseq.MakeBioSequenceSlice() + bufBytes = 0 + } + } + + for iterator.Next() { + seqs := iterator.Get() + source = seqs.Source() + for _, s := range seqs.Slice() { + sz := s.MemorySize() + // flush before adding if it would overflow, but only if + // we already meet the minimum sequence count + if bufBytes+sz > maxBytes && len(buffer) >= minSeqs { + flush() + } + buffer = append(buffer, s) + bufBytes += sz + } + } + flush() + + newIter.Done() + }() + + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + + return newIter +} + func (iterator IBioSequence) FilterEmpty() IBioSequence { newIter := MakeIBioSequence() diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index 5109ac3..5a9878d 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -8,6 +8,7 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" log "github.com/sirupsen/logrus" "github.com/DavidGamba/go-getoptions" @@ -57,6 +58,10 @@ func RegisterGlobalOptions(options *getoptions.GetOpt) { options.GetEnv("OBIBATCHSIZE"), options.Description("Number of sequence per batch for paralelle processing")) + options.StringVar(obidefault.BatchMemStrPtr(), "batch-mem", "", + options.GetEnv("OBIBATCHMEM"), + options.Description("Maximum memory per batch (e.g. 128K, 64M, 1G). Overrides --batch-size when set.")) + options.Bool("solexa", false, options.GetEnv("OBISOLEXA"), options.Description("Decodes quality string according to the Solexa specification.")) @@ -157,6 +162,15 @@ func ProcessParsedOptions(options *getoptions.GetOpt, parseErr error) { if options.Called("solexa") { obidefault.SetReadQualitiesShift(64) } + + if options.Called("batch-mem") { + n, err := obiutils.ParseMemSize(obidefault.BatchMemStr()) + if err != nil { + log.Fatalf("Invalid --batch-mem value %q: %v", obidefault.BatchMemStr(), err) + } + obidefault.SetBatchMem(n) + log.Printf("Memory-based batching enabled: %s per batch", obidefault.BatchMemStr()) + } } func GenerateOptionParser(program string, diff --git a/pkg/obiseq/biosequence.go b/pkg/obiseq/biosequence.go index a362a34..b136bb2 100644 --- a/pkg/obiseq/biosequence.go +++ b/pkg/obiseq/biosequence.go @@ -273,6 +273,28 @@ func (s *BioSequence) Len() int { return len(s.sequence) } +// MemorySize returns an estimate of the memory footprint of the BioSequence +// in bytes. It accounts for the sequence, quality scores, feature data, +// annotations, and fixed struct overhead. The estimate is conservative +// (cap rather than len for byte slices) so it is suitable for memory-based +// batching decisions. +func (s *BioSequence) MemorySize() int { + if s == nil { + return 0 + } + // fixed struct overhead (strings, pointers, mutex pointer) + const overhead = 128 + n := overhead + n += cap(s.sequence) + n += cap(s.qualities) + n += cap(s.feature) + n += len(s.id) + n += len(s.source) + // rough annotation estimate: each key+value pair ~64 bytes on average + n += len(s.annotations) * 64 + return n +} + // HasQualities checks if the BioSequence has sequence qualitiy scores. // // This function does not have any parameters. diff --git a/pkg/obiseq/pool.go b/pkg/obiseq/pool.go index efe0bda..7c5be1f 100644 --- a/pkg/obiseq/pool.go +++ b/pkg/obiseq/pool.go @@ -1,13 +1,20 @@ package obiseq import ( + "runtime" "sync" + "sync/atomic" log "github.com/sirupsen/logrus" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) +const _LargeSliceThreshold = 100 * 1024 // 100 kb — below: leave to GC, above: trigger explicit GC +const _GCBytesBudget = int64(256 * 1024 * 1024) // trigger GC every 256 MB of large discards + +var _largeSliceDiscardedBytes = atomic.Int64{} + var _BioSequenceByteSlicePool = sync.Pool{ New: func() interface{} { bs := make([]byte, 0, 300) @@ -34,6 +41,13 @@ func RecycleSlice(s *[]byte) { } if cap(*s) <= 1024 { _BioSequenceByteSlicePool.Put(s) + } else if cap(*s) >= _LargeSliceThreshold { + n := int64(cap(*s)) + *s = nil + prev := _largeSliceDiscardedBytes.Load() + if _largeSliceDiscardedBytes.Add(n)/_GCBytesBudget > prev/_GCBytesBudget { + runtime.GC() + } } } } diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index d05c881..a5e27bc 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -214,6 +214,10 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { iterator = iterator.Speed("Reading sequences") + if obidefault.BatchMem() > 0 { + iterator = iterator.RebatchBySize(obidefault.BatchMem(), 1) + } + return iterator, nil } diff --git a/pkg/obiutils/memsize.go b/pkg/obiutils/memsize.go new file mode 100644 index 0000000..b2f78d1 --- /dev/null +++ b/pkg/obiutils/memsize.go @@ -0,0 +1,85 @@ +package obiutils + +import ( + "fmt" + "strconv" + "strings" + "unicode" +) + +// ParseMemSize parses a human-readable memory size string and returns the +// equivalent number of bytes. The value is a number optionally followed by a +// unit suffix (case-insensitive): +// +// B or (no suffix) — bytes +// K or KB — kibibytes (1 024) +// M or MB — mebibytes (1 048 576) +// G or GB — gibibytes (1 073 741 824) +// T or TB — tebibytes (1 099 511 627 776) +// +// Examples: "512", "128K", "128k", "64M", "1G", "2GB" +func ParseMemSize(s string) (int, error) { + s = strings.TrimSpace(s) + if s == "" { + return 0, fmt.Errorf("empty memory size string") + } + + // split numeric prefix from unit suffix + i := 0 + for i < len(s) && (unicode.IsDigit(rune(s[i])) || s[i] == '.') { + i++ + } + numStr := s[:i] + unit := strings.ToUpper(strings.TrimSpace(s[i:])) + // strip trailing 'B' from two-letter units (KB→K, MB→M …) + if len(unit) == 2 && unit[1] == 'B' { + unit = unit[:1] + } + + val, err := strconv.ParseFloat(numStr, 64) + if err != nil { + return 0, fmt.Errorf("invalid memory size %q: %w", s, err) + } + + var multiplier float64 + switch unit { + case "", "B": + multiplier = 1 + case "K": + multiplier = 1024 + case "M": + multiplier = 1024 * 1024 + case "G": + multiplier = 1024 * 1024 * 1024 + case "T": + multiplier = 1024 * 1024 * 1024 * 1024 + default: + return 0, fmt.Errorf("unknown memory unit %q in %q", unit, s) + } + + return int(val * multiplier), nil +} + +// FormatMemSize formats a byte count as a human-readable string with the +// largest unit that produces a value ≥ 1 (e.g. 1536 → "1.5K"). +func FormatMemSize(n int) string { + units := []struct { + suffix string + size int + }{ + {"T", 1024 * 1024 * 1024 * 1024}, + {"G", 1024 * 1024 * 1024}, + {"M", 1024 * 1024}, + {"K", 1024}, + } + for _, u := range units { + if n >= u.size { + v := float64(n) / float64(u.size) + if v == float64(int(v)) { + return fmt.Sprintf("%d%s", int(v), u.suffix) + } + return fmt.Sprintf("%.1f%s", v, u.suffix) + } + } + return fmt.Sprintf("%dB", n) +}