Reduce memory allocation events

Former-commit-id: fbdb2afc857b02adc2593e2278d3bd838e99b0b2
This commit is contained in:
Eric Coissac
2024-06-22 21:01:53 +02:00
parent 54a138196c
commit e6b87ecd02
19 changed files with 166 additions and 75 deletions

View File

@ -35,6 +35,7 @@ func main() {
_, args := optionParser(os.Args)
obioptions.SetStrictReadWorker(min(4, obioptions.CLIParallelWorkers()))
fs, err := obiconvert.CLIReadBioSequences(args...)
if err != nil {

View File

@ -32,6 +32,9 @@ func main() {
optionParser := obioptions.GenerateOptionParser(obipairing.OptionSet)
optionParser(os.Args)
obioptions.SetStrictReadWorker(2)
obioptions.SetStrictWriteWorker(2)
pairs, err := obipairing.CLIPairedSequence()
if err != nil {

View File

@ -33,7 +33,9 @@ func main() {
// defer trace.Stop()
obioptions.SetWorkerPerCore(2)
obioptions.SetReadWorkerPerCore(0.5)
obioptions.SetStrictReadWorker(1)
obioptions.SetStrictWriteWorker(1)
obioptions.SetBatchSize(10)
optionParser := obioptions.GenerateOptionParser(obitag.OptionSet)

View File

@ -124,25 +124,26 @@ func BuildAlignment(seqA, seqB *obiseq.BioSequence,
// In that case arenas will be allocated by the function but, they will not
// be reusable for other alignments and desallocated at the BuildQualityConsensus
// return.
func BuildQualityConsensus(seqA, seqB *obiseq.BioSequence, path []int, statOnMismatch bool) (*obiseq.BioSequence, int) {
func BuildQualityConsensus(seqA, seqB *obiseq.BioSequence, path []int, statOnMismatch bool,
arenaAlign PEAlignArena) (*obiseq.BioSequence, int) {
bufferSA := obiseq.GetSlice(seqA.Len())
bufferSB := obiseq.GetSlice(seqB.Len())
defer obiseq.RecycleSlice(&bufferSB)
bufferSA := arenaAlign.pointer.aligneSeqA
bufferSB := arenaAlign.pointer.aligneSeqB
// defer obiseq.RecycleSlice(&bufferSB)
bufferQA := obiseq.GetSlice(seqA.Len())
bufferQB := obiseq.GetSlice(seqB.Len())
defer obiseq.RecycleSlice(&bufferQB)
bufferQA := arenaAlign.pointer.aligneQualA
bufferQB := arenaAlign.pointer.aligneQualB
// defer obiseq.RecycleSlice(&bufferQB)
_BuildAlignment(seqA.Sequence(), seqB.Sequence(), path, ' ',
&bufferSA, &bufferSB)
bufferSA, bufferSB)
// log.Printf("#1 %s--> la : %d,%p lb : %d,%p qa : %d,%p qb : %d,%p\n", stamp,
// len(*bufferSA), bufferSA, len(*bufferSB), bufferSB,
// len(*bufferQA), bufferQA, len(*bufferQB), bufferQB)
_BuildAlignment(seqA.Qualities(), seqB.Qualities(), path, byte(0),
&bufferQA, &bufferQB)
bufferQA, bufferQB)
// log.Printf("#2 %s--> la : %d,%p lb : %d,%p qa : %d,%p qb : %d,%p\n", stamp,
// len(*bufferSA), bufferSA, len(*bufferSB), bufferSB,
@ -157,10 +158,10 @@ func BuildQualityConsensus(seqA, seqB *obiseq.BioSequence, path []int, statOnMis
match := 0
for i, qA = range bufferQA {
nA := bufferSA[i]
nB := bufferSB[i]
qB = bufferQB[i]
for i, qA = range *bufferQA {
nA := (*bufferSA)[i]
nB := (*bufferSB)[i]
qB = (*bufferQB)[i]
if statOnMismatch && nA != nB && nA != ' ' && nB != ' ' {
mismatches[strings.ToUpper(fmt.Sprintf("(%c:%02d)->(%c:%02d)", nA, qA, nB, qB))] = i + 1
@ -171,13 +172,13 @@ func BuildQualityConsensus(seqA, seqB *obiseq.BioSequence, path []int, statOnMis
qm = qB
}
if qB > qA {
bufferSA[i] = bufferSB[i]
(*bufferSA)[i] = (*bufferSB)[i]
qM = qB
qm = qA
}
if qB == qA && nA != nB {
nuc := _FourBitsBaseCode[nA&31] | _FourBitsBaseCode[nB&31]
bufferSA[i] = _FourBitsBaseDecode[nuc]
(*bufferSA)[i] = _FourBitsBaseDecode[nuc]
}
q := qA + qB
@ -195,15 +196,15 @@ func BuildQualityConsensus(seqA, seqB *obiseq.BioSequence, path []int, statOnMis
q = 90
}
bufferQA[i] = q
(*bufferQA)[i] = q
}
consSeq := obiseq.NewBioSequence(
seqA.Id(),
bufferSA,
*bufferSA,
seqA.Definition(),
)
consSeq.SetQualities(bufferQA)
consSeq.SetQualities(*bufferQA)
if statOnMismatch && len(mismatches) > 0 {
consSeq.SetAttribute("pairing_mismatches", mismatches)

View File

@ -13,6 +13,10 @@ type _PeAlignArena struct {
path []int
fastIndex [][]int
fastBuffer []byte
aligneSeqA *[]byte
aligneSeqB *[]byte
aligneQualA *[]byte
aligneQualB *[]byte
}
// PEAlignArena defines memory arena usable by the
@ -30,12 +34,21 @@ var NilPEAlignArena = PEAlignArena{nil}
// MakePEAlignArena makes a new arena for the alignment of two paired sequences
// of maximum length indicated by lseqA and lseqB.
func MakePEAlignArena(lseqA, lseqB int) PEAlignArena {
aligneSeqA := make([]byte, 0, lseqA+lseqB)
aligneSeqB := make([]byte, 0, lseqA+lseqB)
aligneQualA := make([]byte, 0, lseqA+lseqB)
aligneQualB := make([]byte, 0, lseqA+lseqB)
a := _PeAlignArena{
scoreMatrix: make([]int, 0, (lseqA+1)*(lseqB+1)),
pathMatrix: make([]int, 0, (lseqA+1)*(lseqB+1)),
path: make([]int, 2*(lseqA+lseqB)),
fastIndex: make([][]int, 256),
fastBuffer: make([]byte, 0, lseqA),
aligneSeqA: &aligneSeqA,
aligneSeqB: &aligneSeqB,
aligneQualA: &aligneQualA,
aligneQualB: &aligneQualB,
}
return PEAlignArena{&a}
@ -352,7 +365,7 @@ func PERightAlign(seqA, seqB *obiseq.BioSequence, gap, scale float64,
func PEAlign(seqA, seqB *obiseq.BioSequence,
gap, scale float64, fastAlign bool, delta int, fastScoreRel bool,
arena PEAlignArena) (int, []int, int, int, float64) {
arena PEAlignArena, shift_buff *map[int]int) (int, []int, int, int, float64) {
var score, shift int
var startA, startB int
var partLen, over int
@ -374,7 +387,7 @@ func PEAlign(seqA, seqB *obiseq.BioSequence,
&arena.pointer.fastIndex,
&arena.pointer.fastBuffer)
shift, fastCount, fastScore = obikmer.FastShiftFourMer(index, seqA.Len(), seqB, fastScoreRel, nil)
shift, fastCount, fastScore = obikmer.FastShiftFourMer(index, shift_buff, seqA.Len(), seqB, fastScoreRel, nil)
if shift > 0 {
over = seqA.Len() - shift

View File

@ -169,7 +169,9 @@ func _ParseEmblFile(source string, input ChannelSeqFileChunk,
func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
opt := MakeOptions(options)
entry_channel := ReadSeqFileChunk(reader, _EndOfLastEntry)
buff := make([]byte, 1024*1024*1024*256)
entry_channel := ReadSeqFileChunk(reader, buff, _EndOfLastEntry)
newIter := obiiter.MakeIBioSequence()
nworkers := opt.ParallelWorkers()
@ -179,7 +181,7 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
newIter.Add(1)
go _ParseEmblFile(opt.Source(), entry_channel, newIter,
opt.WithFeatureTable(),
opt.BatchSize(),
opt.BatchSize(),
opt.TotalSeqSize())
}

View File

@ -228,7 +228,9 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
nworker := opt.ParallelWorkers()
chkchan := ReadSeqFileChunk(reader, _EndOfLastFastaEntry)
buff := make([]byte, 1024*1024*1024)
chkchan := ReadSeqFileChunk(reader, buff, _EndOfLastFastaEntry)
chunck_order := obiutils.AtomicCounter()
for i := 0; i < nworker; i++ {

View File

@ -112,7 +112,7 @@ func _storeSequenceQuality(bytes *bytes.Buffer, out *obiseq.BioSequence, quality
}
for i := 0; i < len(q); i++ {
q[i] = q[i] - quality_shift
q[i] -= quality_shift
}
out.SetQualities(q)
}
@ -309,7 +309,9 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
nworker := opt.ParallelWorkers()
chunkorder := obiutils.AtomicCounter()
chkchan := ReadSeqFileChunk(reader, _EndOfLastFastqEntry)
buff := make([]byte, 1024*1024*1024)
chkchan := ReadSeqFileChunk(reader, buff, _EndOfLastFastqEntry)
for i := 0; i < nworker; i++ {
out.Add(1)

View File

@ -46,17 +46,8 @@ func FormatFastqBatch(batch obiiter.BioSequenceBatch,
for _, seq := range batch.Slice() {
if seq.Len() > 0 {
fs := FormatFastq(seq, formater)
lb := bs.Len()
n, _ := bs.WriteString(fs)
if n < len(fs) {
log.Panicln("FormatFastqBatch: Cannot write all FASTQ sequences")
}
bs.WriteString(fs)
bs.WriteString("\n")
if bs.Len()-lb < len(fs)+1 {
log.Panicln("FormatFastqBatch: Cannot write all FASTQ sequences correctly")
}
} else {
if skipEmpty {
log.Warnf("Sequence %s is empty and skiped in output", seq.Id())
@ -69,12 +60,6 @@ func FormatFastqBatch(batch obiiter.BioSequenceBatch,
chunk := bs.Bytes()
chunk = chunk[:bs.Len()]
if chunk[0] != '@' {
log.Panicln("FormatFastqBatch: FASTQ format error")
}
return chunk
}

View File

@ -233,7 +233,9 @@ func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
opt := MakeOptions(options)
// entry_channel := make(chan _FileChunk)
entry_channel := ReadSeqFileChunk(reader, _EndOfLastEntry)
buff := make([]byte, 1024*1024*1024*256)
entry_channel := ReadSeqFileChunk(reader, buff, _EndOfLastEntry)
newIter := obiiter.MakeIBioSequence()
nworkers := opt.ParallelWorkers()

View File

@ -33,10 +33,10 @@ type LastSeqRecord func([]byte) int
// Returns:
// None
func ReadSeqFileChunk(reader io.Reader,
buff []byte,
splitter LastSeqRecord) ChannelSeqFileChunk {
var err error
var fullbuff []byte
var buff []byte
chunk_channel := make(ChannelSeqFileChunk)
@ -46,8 +46,7 @@ func ReadSeqFileChunk(reader io.Reader,
i := 0
// Initialize the buffer to the size of a chunk of data
fullbuff = make([]byte, _FileChunkSize, _FileChunkSize*2)
buff = fullbuff
fullbuff = buff
// Read from the reader until the buffer is full or the end of the file is reached
l, err = io.ReadFull(reader, buff)

View File

@ -99,20 +99,20 @@ func Index4mer(seq *obiseq.BioSequence, index *[][]int, buffer *[]byte) [][]int
// FastShiftFourMer runs a Fast algorithm (similar to the one used in FASTA) to compare two sequences.
// The returned values are two integer values. The shift between both the sequences and the count of
// matching 4mer when this shift is applied between both the sequences.
func FastShiftFourMer(index [][]int, lindex int, seq *obiseq.BioSequence, relscore bool, buffer *[]byte) (int, int, float64) {
func FastShiftFourMer(index [][]int, shifts *map[int]int, lindex int, seq *obiseq.BioSequence, relscore bool, buffer *[]byte) (int, int, float64) {
iternal_buffer := Encode4mer(seq, buffer)
shifts := make(map[int]int, 3*seq.Len())
// shifts := make(map[int]int, 3*seq.Len())
for pos, code := range iternal_buffer {
for _, refpos := range index[code] {
shift := refpos - pos
count, ok := shifts[shift]
count, ok := (*shifts)[shift]
if ok {
shifts[shift] = count + 1
(*shifts)[shift] = count + 1
} else {
shifts[shift] = 1
(*shifts)[shift] = 1
}
}
}
@ -121,7 +121,8 @@ func FastShiftFourMer(index [][]int, lindex int, seq *obiseq.BioSequence, relsco
maxcount := 0
maxscore := -1.0
for shift, count := range shifts {
for shift, count := range *shifts {
delete((*shifts), shift)
score := float64(count)
if relscore {
over := -shift

View File

@ -15,11 +15,13 @@ import (
var _Debug = false
var _WorkerPerCore = 2.0
var _ReadWorkerPerCore = 1.0
var _ReadWorkerPerCore = 0.5
var _WriteWorkerPerCore = 0.25
var _StrictReadWorker = 0
var _StrictWriteWorker = 0
var _ParallelFilesRead = 0
var _MaxAllowedCPU = runtime.NumCPU()
var _BatchSize = 5000
var _BatchSize = 2000
var _Pprof = false
var _Quality_Shift_Input = byte(33)
var _Quality_Shift_Output = byte(33)
@ -175,12 +177,37 @@ func CLIParallelWorkers() int {
// Returns an integer representing the number of parallel workers.
func CLIReadParallelWorkers() int {
if StrictReadWorker() == 0 {
return int(float64(CLIMaxCPU()) * ReadWorkerPerCore())
n := int(float64(CLIMaxCPU()) * ReadWorkerPerCore())
if n == 0 {
n = 1
}
return n
} else {
return StrictReadWorker()
}
}
// CLIWriteParallelWorkers returns the number of parallel workers used for
// writing files.
//
// The number of parallel workers is determined by the command line option
// --max-cpu|-m and the environment variable OBIMAXCPU. This number is
// multiplied by the variable _WriteWorkerPerCore.
//
// No parameters.
// Returns an integer representing the number of parallel workers.
func CLIWriteParallelWorkers() int {
if StrictWriteWorker() == 0 {
n := int(float64(CLIMaxCPU()) * WriteWorkerPerCore())
if n == 0 {
n = 1
}
return n
} else {
return StrictWriteWorker()
}
}
// CLIMaxCPU returns the maximum number of CPU cores allowed.
//
// The maximum number of CPU cores is determined by the command line option
@ -247,6 +274,15 @@ func ReadWorkerPerCore() float64 {
return _ReadWorkerPerCore
}
// WriteWorkerPerCore returns the number of worker per CPU core for
// computing the result.
//
// No parameters.
// Returns a float64 representing the number of worker per CPU core.
func WriteWorkerPerCore() float64 {
return _WriteWorkerPerCore
}
// SetBatchSize sets the size of the sequence batches.
//
// n - an integer representing the size of the sequence batches.
@ -318,13 +354,33 @@ func StrictReadWorker() int {
return _StrictReadWorker
}
// SetWriteWorker sets the number of workers for writing files.
//
// The number of worker dedicated to writing files is determined
// as the number of allowed CPU cores multiplied by number of write workers per core.
// Setting the number of write workers using this function allows to decouple the number
// of write workers from the number of CPU cores.
//
// n - an integer representing the number of workers to be set.
func SetStrictWriteWorker(n int) {
_StrictWriteWorker = n
}
// WriteWorker returns the number of workers for writing files.
//
// No parameters.
// Returns an integer representing the number of workers.
func StrictWriteWorker() int {
return _StrictWriteWorker
}
// ParallelFilesRead returns the number of files to be read in parallel.
//
// No parameters.
// Returns an integer representing the number of files to be read.
func ParallelFilesRead() int {
if _ParallelFilesRead == 0 {
return CLIParallelWorkers()
return CLIReadParallelWorkers()
} else {
return _ParallelFilesRead
}

View File

@ -7,7 +7,7 @@ import (
// TODO: The version number is extracted from git. This induces that the version
// corresponds to the last commit, and not the one when the file will be
// commited
var _Commit = "612868a"
var _Commit = "bcaa264"
var _Version = "Release 4.2.0"
// Version returns the version of the obitools package.

View File

@ -12,6 +12,7 @@ package obiseq
import (
"crypto/md5"
"slices"
"sync"
"sync/atomic"
@ -418,12 +419,15 @@ func (s *BioSequence) SetFeatures(feature []byte) {
s.feature = feature
}
// Setting the sequence of the BioSequence.
// SetSequence sets the sequence of the BioSequence.
//
// Parameters:
// - sequence: a byte slice representing the sequence to be set.
func (s *BioSequence) SetSequence(sequence []byte) {
if s.sequence != nil {
RecycleSlice(&s.sequence)
}
s.sequence = CopySlice(obiutils.InPlaceToLower(sequence))
s.sequence = obiutils.InPlaceToLower(CopySlice(sequence))
}
// Setting the qualities of the BioSequence.
@ -507,3 +511,15 @@ func (s *BioSequence) Composition() map[byte]int {
return counts
}
func (s *BioSequence) Grow(length int) {
if s.sequence == nil {
s.sequence = GetSlice(length)
} else {
s.sequence = slices.Grow(s.sequence, length)
}
if s.qualities != nil {
s.qualities = slices.Grow(s.qualities, length)
}
}

View File

@ -84,7 +84,7 @@ func CopySlice(src []byte) []byte {
var BioSequenceAnnotationPool = sync.Pool{
New: func() interface{} {
bs := make(Annotation, 5)
bs := make(Annotation, 1)
return &bs
},
}
@ -105,15 +105,17 @@ func RecycleAnnotation(a *Annotation) {
//
// It returns an Annotation.
func GetAnnotation(values ...Annotation) Annotation {
a := Annotation(nil)
a := (*Annotation)(nil)
for a == nil {
a = *(BioSequenceAnnotationPool.Get().(*Annotation))
for a == nil || (*a == nil) {
a = BioSequenceAnnotationPool.Get().(*Annotation)
}
annot := *a
if len(values) > 0 {
obiutils.MustFillMap(a, values[0])
obiutils.MustFillMap(annot, values[0])
}
return a
return annot
}

View File

@ -53,10 +53,7 @@ func CLIWriteBioSequences(iterator obiiter.IBioSequence,
opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader))
}
nworkers := obioptions.CLIParallelWorkers() / 4
if nworkers < 2 {
nworkers = 2
}
nworkers := obioptions.CLIWriteParallelWorkers()
opts = append(opts, obiformats.OptionsParallelWorkers(nworkers))
opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize()))

View File

@ -55,6 +55,8 @@ func JoinPairedSequence(seqA, seqB *obiseq.BioSequence, inplace bool) *obiseq.Bi
seqA = seqA.Copy()
}
seqA.Grow(seqB.Len() + 10)
seqA.WriteString("..........")
seqA.Write(seqB.Sequence())
@ -108,13 +110,16 @@ func JoinPairedSequence(seqA, seqB *obiseq.BioSequence, inplace bool) *obiseq.Bi
func AssemblePESequences(seqA, seqB *obiseq.BioSequence,
gap, scale float64, delta, minOverlap int, minIdentity float64, withStats bool,
inplace bool, fastAlign, fastModeRel bool,
arenaAlign obialign.PEAlignArena) *obiseq.BioSequence {
arenaAlign obialign.PEAlignArena, shifh_buff *map[int]int) *obiseq.BioSequence {
score, path, fastcount, over, fastscore := obialign.PEAlign(seqA, seqB,
score, path, fastcount, over, fastscore := obialign.PEAlign(
seqA, seqB,
gap, scale,
fastAlign, delta, fastModeRel,
arenaAlign)
cons, match := obialign.BuildQualityConsensus(seqA, seqB, path, true)
arenaAlign, shifh_buff,
)
cons, match := obialign.BuildQualityConsensus(seqA, seqB, path, true, arenaAlign)
left := path[0]
right := 0
@ -238,6 +243,7 @@ func IAssemblePESequencesBatch(iterator obiiter.IBioSequence,
f := func(iterator obiiter.IBioSequence, wid int) {
arena := obialign.MakePEAlignArena(150, 150)
shifts := make(map[int]int)
for iterator.Next() {
batch := iterator.Get()
@ -246,7 +252,7 @@ func IAssemblePESequencesBatch(iterator obiiter.IBioSequence,
B := A.PairedWith()
cons[i] = AssemblePESequences(A, B.ReverseComplement(true),
gap, scale,
delta, minOverlap, minIdentity, withStats, true, fastAlign, fastModeRel, arena)
delta, minOverlap, minIdentity, withStats, true, fastAlign, fastModeRel, arena, &shifts)
}
newIter.Push(obiiter.MakeBioSequenceBatch(
batch.Order(),

View File

@ -37,6 +37,7 @@ func IPCRTagPESequencesBatch(iterator obiiter.IBioSequence,
f := func(iterator obiiter.IBioSequence, wid int) {
arena := obialign.MakePEAlignArena(150, 150)
var err error
shifts := make(map[int]int)
for iterator.Next() {
batch := iterator.Get()
@ -46,7 +47,7 @@ func IPCRTagPESequencesBatch(iterator obiiter.IBioSequence,
A.Copy(), B.ReverseComplement(false),
gap, scale,
delta, minOverlap, minIdentity, withStats, true,
fastAlign, fastScoreRel, arena,
fastAlign, fastScoreRel, arena, &shifts,
)
consensus, err = ngsfilter.ExtractBarcode(consensus, true)