Change the memory management for thee BuildAlignment function

This commit is contained in:
2022-01-15 19:10:16 +01:00
parent e8fff6477b
commit e1b7e1761c
11 changed files with 145 additions and 1329 deletions

View File

@ -14,5 +14,5 @@ func main() {
_, args, _ := optionParser(os.Args)
fs, _ := obiconvert.ReadBioSequencesBatch(args...)
obiconvert.WriteBioSequencesBatch(fs,true)
obiconvert.WriteBioSequencesBatch(fs, true)
}

View File

@ -1,7 +1,9 @@
package main
import (
"log"
"os"
"runtime/trace"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
@ -19,12 +21,12 @@ func main() {
// defer pprof.StopCPUProfile()
// go tool trace cpu.trace
// ftrace, err := os.Create("cpu.trace")
// if err != nil {
// log.Fatal(err)
// }
// trace.Start(ftrace)
// defer trace.Stop()
ftrace, err := os.Create("cpu.trace")
if err != nil {
log.Fatal(err)
}
trace.Start(ftrace)
defer trace.Stop()
optionParser := obioptions.GenerateOptionParser(obipairing.OptionSet)

View File

@ -30,5 +30,5 @@ func main() {
sequences, _ := obiconvert.ReadBioSequencesBatch(args...)
amplicons, _ := obipcr.PCR(sequences)
obiconvert.WriteBioSequencesBatch(amplicons,true)
obiconvert.WriteBioSequencesBatch(amplicons, true)
}

13
go.mod
View File

@ -3,13 +3,16 @@ module git.metabarcoding.org/lecasofts/go/obitools
go 1.17
require (
github.com/DavidGamba/go-getoptions v0.25.0 // indirect
github.com/goccy/go-json v0.9.1 // indirect
github.com/DavidGamba/go-getoptions v0.25.0
github.com/goccy/go-json v0.9.2
github.com/schollz/progressbar/v3 v3.8.5
)
require (
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/schollz/progressbar/v3 v3.8.5 // indirect
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
)

1207
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -1,51 +1,24 @@
// obialign : function for aligning two sequences
//
// The obialign package provides a set of functions
// foor aligning two objects of type obiseq.BioSequence.
package obialign
import (
"math"
"sync"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
type _BuildAlignArena struct {
bufferA []byte
bufferB []byte
var _BuildAlignArenaPool = sync.Pool{
New: func() interface{} {
bs := make([]byte, 0, 300)
return &bs
},
}
// BuildAlignArena defines memory arena usable by the
// BuildAlignment function. The same arena can be reused
// from alignment to alignment to limit memory allocation
// and desallocation process.
type BuildAlignArena struct {
pointer *_BuildAlignArena
}
// NilBuildAlignArena is the nil instance of the BuildAlignArena
// type.
var NilBuildAlignArena = BuildAlignArena{nil}
// MakeBuildAlignArena makes a new arena for aligning two sequences
// of maximum length indicated by lseqA and lseqB.
func MakeBuildAlignArena(lseqA, lseqB int) BuildAlignArena {
a := _BuildAlignArena{
bufferA: make([]byte, lseqA+lseqB),
bufferB: make([]byte, lseqA+lseqB),
}
return BuildAlignArena{&a}
}
func _BuildAlignment(seqA, seqB []byte, path []int, gap byte,
bufferA, bufferB *[]byte) ([]byte, []byte) {
if bufferA == nil {
b := make([]byte, 0, len(seqA)+len(seqB))
bufferA = &b
}
if bufferB == nil {
b := make([]byte, 0, len(seqA)+len(seqB))
bufferB = &b
}
func _BuildAlignment(seqA, seqB []byte, path []int, gap byte, bufferA, bufferB *[]byte) {
*bufferA = (*bufferA)[:0]
*bufferB = (*bufferB)[:0]
@ -81,7 +54,6 @@ func _BuildAlignment(seqA, seqB []byte, path []int, gap byte,
}
}
return *bufferA, *bufferB
}
// BuildAlignment builds the aligned sequences from an alignemnt path
@ -94,27 +66,37 @@ func _BuildAlignment(seqA, seqB []byte, path []int, gap byte,
// be reusable for other alignments and desallocated at the BuildAlignment
// return.
func BuildAlignment(seqA, seqB obiseq.BioSequence,
path []int, gap byte, arena BuildAlignArena) (obiseq.BioSequence, obiseq.BioSequence) {
path []int, gap byte) (obiseq.BioSequence, obiseq.BioSequence) {
if arena.pointer == nil {
arena = MakeBuildAlignArena(seqA.Length(), seqB.Length())
}
bufferSA := _BuildAlignArenaPool.Get().(*[]byte)
defer _BuildAlignArenaPool.Put(bufferSA)
A, B := _BuildAlignment(seqA.Sequence(), seqB.Sequence(), path, gap,
&arena.pointer.bufferA,
&arena.pointer.bufferB)
bufferSB := _BuildAlignArenaPool.Get().(*[]byte)
defer _BuildAlignArenaPool.Put(bufferSB)
_BuildAlignment(seqA.Sequence(), seqB.Sequence(), path, gap,
bufferSA,
bufferSB)
seqA = obiseq.MakeBioSequence(seqA.Id(),
A,
*bufferSA,
seqA.Definition())
seqB = obiseq.MakeBioSequence(seqB.Id(),
B,
*bufferSB,
seqB.Definition())
return seqA, seqB
}
// func _logSlice(x *[]byte) {
// l := len(*x)
// if l > 10 {
// l = 10
// }
// log.Printf("%v (%10s): slice=%p array=%p cap=%d len=%d\n", (*x)[:l], string((*x)[:l]), x, (*x), cap(*x), len(*x))
// }
// BuildQualityConsensus builds the consensus sequences corresponding to an
// alignement between two sequences.
// The consensus is built from an alignemnt path returned by one of the
@ -128,26 +110,34 @@ func BuildAlignment(seqA, seqB obiseq.BioSequence,
// In that case arenas will be allocated by the function but, they will not
// be reusable for other alignments and desallocated at the BuildQualityConsensus
// return.
func BuildQualityConsensus(seqA, seqB obiseq.BioSequence, path []int,
arena1, arena2 BuildAlignArena) (obiseq.BioSequence, int) {
func BuildQualityConsensus(seqA, seqB obiseq.BioSequence, path []int) (obiseq.BioSequence, int) {
if arena1.pointer == nil {
arena1 = MakeBuildAlignArena(seqA.Length(), seqB.Length())
}
if arena2.pointer == nil {
arena2 = MakeBuildAlignArena(seqA.Length(), seqB.Length())
}
bufferSA := _BuildAlignArenaPool.Get().(*[]byte)
defer _BuildAlignArenaPool.Put(bufferSA)
sA, sB := _BuildAlignment(seqA.Sequence(), seqB.Sequence(), path, ' ',
&arena1.pointer.bufferA,
&arena1.pointer.bufferB)
bufferSB := _BuildAlignArenaPool.Get().(*[]byte)
defer _BuildAlignArenaPool.Put(bufferSB)
qsA, qsB := _BuildAlignment(seqA.Qualities(), seqB.Qualities(), path, byte(0),
&arena2.pointer.bufferA,
&arena2.pointer.bufferB)
bufferQA := _BuildAlignArenaPool.Get().(*[]byte)
defer _BuildAlignArenaPool.Put(bufferQA)
consensus := make([]byte, 0, len(sA))
qualities := make([]byte, 0, len(sA))
bufferQB := _BuildAlignArenaPool.Get().(*[]byte)
defer _BuildAlignArenaPool.Put(bufferQB)
_BuildAlignment(seqA.Sequence(), seqB.Sequence(), path, ' ',
bufferSA, bufferSB)
// log.Printf("#1 %s--> la : %d,%p lb : %d,%p qa : %d,%p qb : %d,%p\n", stamp,
// len(*bufferSA), bufferSA, len(*bufferSB), bufferSB,
// len(*bufferQA), bufferQA, len(*bufferQB), bufferQB)
_BuildAlignment(seqA.Qualities(), seqB.Qualities(), path, byte(0),
bufferQA, bufferQB)
// log.Printf("#2 %s--> la : %d,%p lb : %d,%p qa : %d,%p qb : %d,%p\n", stamp,
// len(*bufferSA), bufferSA, len(*bufferSB), bufferSB,
// len(*bufferQA), bufferQA, len(*bufferQB), bufferQB)
// log.Printf("#3 %s--> la : %d lb : %d, qa : %d qb : %d\n", stamp, len(sA), len(sB), len(qsA), len(qsB))
var qA, qB byte
var qM, qm byte
@ -155,31 +145,32 @@ func BuildQualityConsensus(seqA, seqB obiseq.BioSequence, path []int,
match := 0
for i, qA = range qsA {
qB = qsB[i]
for i, qA = range *bufferQA {
nA := (*bufferSA)[i]
nB := (*bufferSB)[i]
qB = (*bufferQB)[i]
if qA > qB {
consensus = append(consensus, sA[i])
qM = qA
qm = qB
}
if qB > qA {
consensus = append(consensus, sB[i])
(*bufferSA)[i] = (*bufferSB)[i]
qM = qB
qm = qA
}
if qB == qA {
nuc := _FourBitsBaseCode[sA[i]&31] | _FourBitsBaseCode[sB[i]&31]
consensus = append(consensus, _FourBitsBaseDecode[nuc])
if qB == qA && nA != nB {
nuc := _FourBitsBaseCode[nA&31] | _FourBitsBaseCode[nB&31]
(*bufferSA)[i] = _FourBitsBaseDecode[nuc]
}
q := qA + qB
if qA > 0 && qB > 0 {
if sA[i] != sB[i] {
if nA != nB {
q = qM - byte(math.Log10(1-math.Pow(10, -float64(qm)/30))*10+0.5)
}
if sA[i] == sB[i] {
if nA == nB {
match++
}
}
@ -187,11 +178,16 @@ func BuildQualityConsensus(seqA, seqB obiseq.BioSequence, path []int,
if q > 90 {
q = 90
}
qualities = append(qualities, q)
(*bufferQA)[i] = q
}
seq := obiseq.MakeBioSequence(seqA.Id(), consensus, seqA.Definition())
seq.SetQualities(qualities)
consSeq := obiseq.MakeBioSequence(
seqA.Id(),
(*bufferSA),
seqA.Definition(),
)
consSeq.SetSequence((*bufferQA))
return seq, match
return consSeq, match
}

View File

@ -61,11 +61,10 @@ func _GetMatrixFrom(matrix *[]int, lenA, a, b int) (int, int, int) {
func _PairingScorePeAlign(baseA, qualA, baseB, qualB byte) int {
partMatch := _NucPartMatch[baseA&31][baseB&31]
// log.Printf("id : %f A : %s %d B : %s %d\n", part_match, string(baseA), qualA, string(baseB), qualB)
switch {
case partMatch == 1:
// log.Printf("match\n")
switch int(partMatch * 100) {
case 100:
return _NucScorePartMatchMatch[qualA][qualB]
case partMatch == 0:
case 0:
return _NucScorePartMatchMismatch[qualA][qualB]
default:
return int(partMatch*float64(_NucScorePartMatchMatch[qualA][qualB]) +

View File

@ -54,7 +54,7 @@ func WriteSequencesToStdout(iterator obiseq.IBioSequence, options ...WithOption)
func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch,
file io.Writer,
options ...WithOption) (obiseq.IBioSequenceBatch,error) {
options ...WithOption) (obiseq.IBioSequenceBatch, error) {
var newIter obiseq.IBioSequenceBatch
var err error
@ -65,25 +65,25 @@ func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch,
iterator.PushBack()
batch := iterator.Get()
if batch.Slice()[0].HasQualities() {
newIter,err = WriteFastqBatch(iterator, file, options...)
newIter, err = WriteFastqBatch(iterator, file, options...)
} else {
newIter,err = WriteFastaBatch(iterator, file, options...)
newIter, err = WriteFastaBatch(iterator, file, options...)
}
return newIter,err
return newIter, err
}
return obiseq.NilIBioSequenceBatch,fmt.Errorf("input iterator not ready")
return obiseq.NilIBioSequenceBatch, fmt.Errorf("input iterator not ready")
}
func WriteSequencesBatchToStdout(iterator obiseq.IBioSequenceBatch,
options ...WithOption) (obiseq.IBioSequenceBatch,error) {
func WriteSequencesBatchToStdout(iterator obiseq.IBioSequenceBatch,
options ...WithOption) (obiseq.IBioSequenceBatch, error) {
return WriteSequenceBatch(iterator, os.Stdout, options...)
}
func WriteSequencesBatchToFile(iterator obiseq.IBioSequenceBatch,
filename string,
options ...WithOption) (obiseq.IBioSequenceBatch,error) {
options ...WithOption) (obiseq.IBioSequenceBatch, error) {
file, err := os.Create(filename)

View File

@ -172,6 +172,10 @@ func (s BioSequence) WriteQualities(data []byte) (int, error) {
return s.sequence.qualities.Write(data)
}
func (s BioSequence) WriteByteQualities(data byte) error {
return s.sequence.qualities.WriteByte(data)
}
func (s BioSequence) Write(data []byte) (int, error) {
return s.sequence.sequence.Write(data)
}

View File

@ -127,7 +127,7 @@ func (iterator IBioSequenceBatch) MakeISliceWorker(worker SeqSliceWorker, sizes
}
log.Println("Start of the batch slice workers")
for i := 0; i < nworkers - 1; i++ {
for i := 0; i < nworkers-1; i++ {
go f(iterator.Split())
}
go f(iterator)

View File

@ -19,6 +19,13 @@ func _Abs(x int) int {
return x
}
// JoinPairedSequence paste two sequences putting 10 dots as separator.
// if both sequences havee quality scores a quality of 0 is assoociated
// to the added dot.
// If the inplace argument is set to 'true', memory allocated to the
// sequences provided are is used too limite reallocation. The two sequences
// provided as arguments can therefore not anymore used after the return of
// of the JoinPairedSequence. You have even noot to recycle them.
func JoinPairedSequence(seqA, seqB obiseq.BioSequence, inplace bool) obiseq.BioSequence {
if !inplace {
@ -28,8 +35,14 @@ func JoinPairedSequence(seqA, seqB obiseq.BioSequence, inplace bool) obiseq.BioS
seqA.WriteString("..........")
seqA.Write(seqB.Sequence())
seqA.WriteQualities(obiseq.Quality{0, 0, 0, 0, 0, 0, 0, 0, 0, 0})
seqA.WriteQualities(seqB.Qualities())
if seqA.HasQualities() && seqB.HasQualities() {
seqA.WriteQualities(obiseq.Quality{0, 0, 0, 0, 0, 0, 0, 0, 0, 0})
seqA.WriteQualities(seqB.Qualities())
}
if inplace {
(&seqB).Recycle()
}
return seqA
}
@ -38,21 +51,18 @@ func JoinPairedSequence(seqA, seqB obiseq.BioSequence, inplace bool) obiseq.BioS
// the obipairing strategy implemented in obialign.PEAlign using
// the gap and delta parametters.
// If the length of the overlap between both sequences is less than
// overlap_min, The alignment is substituted by a simple pasting
// overlapMin, The alignment is substituted by a simple pasting
// of the sequences with a strech of 10 dots in between them.
// the quality of the dots is set to 0.
// If the inplace parameter is set to true, the seqA and seqB are
// destroyed during the assembling process and cannot be reuse later on.
func AssemblePESequences(seqA, seqB obiseq.BioSequence,
gap, delta, overlap_min int, with_stats bool,
gap, delta, overlapMin int, withStats bool,
inplace bool,
arena_align obialign.PEAlignArena,
arena_cons obialign.BuildAlignArena,
arena_qual obialign.BuildAlignArena) obiseq.BioSequence {
arenaAlign obialign.PEAlignArena) obiseq.BioSequence {
score, path := obialign.PEAlign(seqA, seqB, gap, delta, arena_align)
cons, match := obialign.BuildQualityConsensus(seqA, seqB, path,
arena_cons, arena_qual)
score, path := obialign.PEAlign(seqA, seqB, gap, delta, arenaAlign)
cons, match := obialign.BuildQualityConsensus(seqA, seqB, path)
left := path[0]
right := 0
@ -60,10 +70,10 @@ func AssemblePESequences(seqA, seqB obiseq.BioSequence,
right = path[len(path)-2]
}
lcons := cons.Length()
ali_length := lcons - _Abs(left) - _Abs(right)
aliLength := lcons - _Abs(left) - _Abs(right)
if ali_length >= overlap_min {
if with_stats {
if aliLength >= overlapMin {
if withStats {
annot := cons.Annotations()
annot["mode"] = "alignment"
annot["score"] = score
@ -83,14 +93,14 @@ func AssemblePESequences(seqA, seqB obiseq.BioSequence,
annot["seq_b_single"] = right
}
score_norm := float64(0)
if ali_length > 0 {
score_norm = math.Round(float64(match)/float64(ali_length)*1000) / 1000
scoreNorm := float64(0)
if aliLength > 0 {
scoreNorm = math.Round(float64(match)/float64(aliLength)*1000) / 1000
}
annot["ali_length"] = ali_length
annot["ali_length"] = aliLength
annot["seq_ab_match"] = match
annot["score_norm"] = score_norm
annot["score_norm"] = scoreNorm
if inplace {
(&seqA).Recycle()
@ -100,21 +110,18 @@ func AssemblePESequences(seqA, seqB obiseq.BioSequence,
} else {
cons = JoinPairedSequence(seqA, seqB, inplace)
if with_stats {
if withStats {
annot := cons.Annotations()
annot["mode"] = "join"
}
if inplace {
(&seqB).Recycle()
}
}
return cons
}
func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch,
gap, delta, overlap_min int, with_stats bool, sizes ...int) obiseq.IBioSequenceBatch {
gap, delta, overlapMin int, withStats bool, sizes ...int) obiseq.IBioSequenceBatch {
nworkers := runtime.NumCPU() - 1
buffsize := iterator.BufferSize()
@ -150,8 +157,6 @@ func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch,
f := func(iterator obiseq.IPairedBioSequenceBatch, wid int) {
arena := obialign.MakePEAlignArena(150, 150)
barena1 := obialign.MakeBuildAlignArena(150, 150)
barena2 := obialign.MakeBuildAlignArena(150, 150)
// log.Printf("\n==> %d Wait data to align\n", wid)
// start := time.Now()
@ -163,7 +168,7 @@ func IAssemblePESequencesBatch(iterator obiseq.IPairedBioSequenceBatch,
processed := 0
for i, A := range batch.Forward() {
B := batch.Reverse()[i]
cons[i] = AssemblePESequences(A, B, 2, 5, 20, true, true, arena, barena1, barena2)
cons[i] = AssemblePESequences(A, B, 2, 5, 20, true, true, arena)
if i%59 == 0 {
bar.Add(59)
processed += 59