Patch for some lost of data during sequence writing

This commit is contained in:
2023-02-08 13:14:26 +01:00
parent 4117cbdd08
commit 526bf79c7f
16 changed files with 99 additions and 10 deletions

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiannotate" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiannotate"
@ -36,4 +37,7 @@ func main() {
sequences, _ := obiconvert.ReadBioSequences(args...) sequences, _ := obiconvert.ReadBioSequences(args...)
annotator := obiannotate.CLIAnnotationPipeline() annotator := obiannotate.CLIAnnotationPipeline()
obiconvert.WriteBioSequences(sequences.Pipe(annotator), true) obiconvert.WriteBioSequences(sequences.Pipe(annotator), true)
obiiter.WaitForLastPipe()
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiclean" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiclean"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
@ -19,4 +20,7 @@ func main() {
cleaned := obiclean.IOBIClean(fs) cleaned := obiclean.IOBIClean(fs)
obiconvert.WriteBioSequences(cleaned, true) obiconvert.WriteBioSequences(cleaned, true)
obiiter.WaitForLastPipe()
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
@ -18,4 +19,7 @@ func main() {
comp := fs.MakeIWorker(obiseq.ReverseComplementWorker(true)) comp := fs.MakeIWorker(obiseq.ReverseComplementWorker(true))
obiconvert.WriteBioSequences(comp, true) obiconvert.WriteBioSequences(comp, true)
obiiter.WaitForLastPipe()
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
@ -15,4 +16,7 @@ func main() {
fs, _ := obiconvert.ReadBioSequences(args...) fs, _ := obiconvert.ReadBioSequences(args...)
obiconvert.WriteBioSequences(fs, true) obiconvert.WriteBioSequences(fs, true)
obiiter.WaitForLastPipe()
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obidistribute" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obidistribute"
@ -16,4 +17,7 @@ func main() {
fs, _ := obiconvert.ReadBioSequences(args...) fs, _ := obiconvert.ReadBioSequences(args...)
obidistribute.DistributeSequence(fs) obidistribute.DistributeSequence(fs)
obiiter.WaitForLastPipe()
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
@ -36,4 +37,6 @@ func main() {
sequences, _ := obiconvert.ReadBioSequences(args...) sequences, _ := obiconvert.ReadBioSequences(args...)
selected := obigrep.IFilterSequence(sequences) selected := obigrep.IFilterSequence(sequences)
obiconvert.WriteBioSequences(selected, true) obiconvert.WriteBioSequences(selected, true)
obiiter.WaitForLastPipe()
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obimultiplex" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obimultiplex"
@ -32,4 +33,6 @@ func main() {
amplicons, _ := obimultiplex.IExtractBarcode(sequences) amplicons, _ := obimultiplex.IExtractBarcode(sequences)
obiconvert.WriteBioSequences(amplicons, true) obiconvert.WriteBioSequences(amplicons, true)
amplicons.Wait() amplicons.Wait()
obiiter.WaitForLastPipe()
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obipairing" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obipairing"
@ -39,4 +40,6 @@ func main() {
obioptions.CLIParallelWorkers(), obioptions.CLIParallelWorkers(),
) )
obiconvert.WriteBioSequences(paired, true) obiconvert.WriteBioSequences(paired, true)
obiiter.WaitForLastPipe()
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obipcr" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obipcr"
@ -33,4 +34,6 @@ func main() {
sequences, _ := obiconvert.ReadBioSequences(args...) sequences, _ := obiconvert.ReadBioSequences(args...)
amplicons, _ := obipcr.PCR(sequences) amplicons, _ := obipcr.PCR(sequences)
obiconvert.WriteBioSequences(amplicons, true) obiconvert.WriteBioSequences(amplicons, true)
obiiter.WaitForLastPipe()
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obirefidx" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obirefidx"
@ -17,6 +18,7 @@ func main() {
fs, _ := obiconvert.ReadBioSequences(args...) fs, _ := obiconvert.ReadBioSequences(args...)
indexed := obirefidx.IndexReferenceDB(fs) indexed := obirefidx.IndexReferenceDB(fs)
written, _ := obiconvert.WriteBioSequences(indexed, false) obiconvert.WriteBioSequences(indexed, true)
written.Consume() obiiter.WaitForLastPipe()
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obitag" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obitag"
@ -36,6 +37,7 @@ func main() {
identified := obitag.AssignTaxonomy(fs) identified := obitag.AssignTaxonomy(fs)
obiconvert.WriteBioSequences(identified, true) obiconvert.WriteBioSequences(identified, true)
obiiter.WaitForLastPipe()
fmt.Println("") fmt.Println("")
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"os" "os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
@ -36,4 +37,7 @@ func main() {
sequences, _ := obiconvert.ReadBioSequences(args...) sequences, _ := obiconvert.ReadBioSequences(args...)
unique := obiuniq.Unique(sequences) unique := obiuniq.Unique(sequences)
obiconvert.WriteBioSequences(unique, true) obiconvert.WriteBioSequences(unique, true)
obiiter.WaitForLastPipe()
} }

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"os" "os"
"strings" "strings"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -71,6 +72,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
nwriters := opt.ParallelWorkers() nwriters := opt.ParallelWorkers()
obiiter.RegisterAPipe()
chunkchan := make(chan FileChunck) chunkchan := make(chan FileChunck)
header_format := opt.FormatFastSeqHeader() header_format := opt.FormatFastSeqHeader()
@ -79,7 +81,11 @@ func WriteFasta(iterator obiiter.IBioSequence,
go func() { go func() {
newIter.WaitAndClose() newIter.WaitAndClose()
for len(chunkchan) > 0 {
time.Sleep(time.Millisecond)
}
close(chunkchan) close(chunkchan)
obiiter.UnregisterPipe()
log.Debugln("End of the fasta file writing") log.Debugln("End of the fasta file writing")
}() }()

View File

@ -61,6 +61,7 @@ func WriteFastq(iterator obiiter.IBioSequence,
nwriters := opt.ParallelWorkers() nwriters := opt.ParallelWorkers()
obiiter.RegisterAPipe()
chunkchan := make(chan FileChunck) chunkchan := make(chan FileChunck)
header_format := opt.FormatFastSeqHeader() header_format := opt.FormatFastSeqHeader()
@ -74,6 +75,7 @@ func WriteFastq(iterator obiiter.IBioSequence,
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
close(chunkchan) close(chunkchan)
obiiter.UnregisterPipe()
log.Debugln("End of the fastq file writing") log.Debugln("End of the fastq file writing")
}() }()
@ -123,7 +125,6 @@ func WriteFastq(iterator obiiter.IBioSequence,
file.Close() file.Close()
} }
} }
}() }()
return newIter, nil return newIter, nil

View File

@ -15,6 +15,25 @@ import (
"github.com/tevino/abool/v2" "github.com/tevino/abool/v2"
) )
var globalLocker sync.WaitGroup
var globalLockerCounter = 0
func RegisterAPipe() {
globalLocker.Add(1)
globalLockerCounter++
log.Debugln(globalLockerCounter, " Pipes are registered now")
}
func UnregisterPipe() {
globalLocker.Done()
globalLockerCounter--
log.Debugln(globalLockerCounter, "are still registered")
}
func WaitForLastPipe() {
globalLocker.Wait()
}
// Structure implementing an iterator over bioseq.BioSequenceBatch // Structure implementing an iterator over bioseq.BioSequenceBatch
// based on a channel. // based on a channel.
type _IBioSequence struct { type _IBioSequence struct {
@ -61,6 +80,9 @@ func MakeIBioSequence(sizes ...int) IBioSequence {
lock := sync.RWMutex{} lock := sync.RWMutex{}
i.lock = &lock i.lock = &lock
ii := IBioSequence{&i} ii := IBioSequence{&i}
RegisterAPipe()
return ii return ii
} }
@ -229,6 +251,7 @@ func (iterator IBioSequence) Push(batch BioSequenceBatch) {
func (iterator IBioSequence) Close() { func (iterator IBioSequence) Close() {
close(iterator.pointer.channel) close(iterator.pointer.channel)
UnregisterPipe()
} }
func (iterator IBioSequence) WaitAndClose() { func (iterator IBioSequence) WaitAndClose() {
@ -237,6 +260,7 @@ func (iterator IBioSequence) WaitAndClose() {
for len(iterator.Channel()) > 0 { for len(iterator.Channel()) > 0 {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
iterator.Close() iterator.Close()
} }
@ -258,20 +282,21 @@ func (iterator IBioSequence) SortBatches(sizes ...int) IBioSequence {
newIter.Add(1) newIter.Add(1)
go func() { go func() {
newIter.Wait() newIter.WaitAndClose()
close(newIter.pointer.channel)
}() }()
next_to_send := 0 next_to_send := 0
//log.Println("wait for batch #", next_to_send)
received := make(map[int]BioSequenceBatch) received := make(map[int]BioSequenceBatch)
go func() { go func() {
for iterator.Next() { for iterator.Next() {
batch := iterator.Get() batch := iterator.Get()
// log.Println("Pushd seq #", batch.order, next_to_send) // log.Println("\nPushd seq #\n", batch.order, next_to_send)
if batch.order == next_to_send { if batch.order == next_to_send {
newIter.pointer.channel <- batch newIter.pointer.channel <- batch
next_to_send++ next_to_send++
//log.Println("\nwait for batch #\n", next_to_send)
batch, ok := received[next_to_send] batch, ok := received[next_to_send]
for ok { for ok {
newIter.pointer.channel <- batch newIter.pointer.channel <- batch
@ -386,8 +411,7 @@ func (iterator IBioSequence) Rebatch(size int, sizes ...int) IBioSequence {
newIter.Add(1) newIter.Add(1)
go func() { go func() {
newIter.Wait() newIter.WaitAndClose()
close(newIter.pointer.channel)
}() }()
go func() { go func() {
@ -427,6 +451,7 @@ func (iterator IBioSequence) Recycle() {
for iterator.Next() { for iterator.Next() {
// iterator.Get() // iterator.Get()
batch := iterator.Get() batch := iterator.Get()
log.Debugln("Recycling batch #", batch.Order())
for _, seq := range batch.Slice() { for _, seq := range batch.Slice() {
seq.Recycle() seq.Recycle()
recycled++ recycled++
@ -488,8 +513,7 @@ func (iterator IBioSequence) PairWith(reverse IBioSequence,
newIter.Add(1) newIter.Add(1)
go func() { go func() {
newIter.Wait() newIter.WaitAndClose()
close(newIter.Channel())
log.Println("End of association of paired reads") log.Println("End of association of paired reads")
}() }()

View File

@ -2,6 +2,7 @@ package obiiter
import ( import (
"sync" "sync"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -96,6 +97,8 @@ func MakeIPairedBioSequenceBatch(sizes ...int) IPairedBioSequenceBatch {
waiting := sync.WaitGroup{} waiting := sync.WaitGroup{}
i.all_done = &waiting i.all_done = &waiting
ii := IPairedBioSequenceBatch{&i} ii := IPairedBioSequenceBatch{&i}
RegisterAPipe()
return ii return ii
} }
@ -115,6 +118,21 @@ func (iterator IPairedBioSequenceBatch) Channel() chan PairedBioSequenceBatch {
return iterator.pointer.channel return iterator.pointer.channel
} }
func (iterator IPairedBioSequenceBatch) Close() {
close(iterator.pointer.channel)
UnregisterPipe()
}
func (iterator IPairedBioSequenceBatch) WaitAndClose() {
iterator.Wait()
for len(iterator.Channel()) > 0 {
time.Sleep(time.Millisecond)
}
iterator.Close()
}
func (iterator IPairedBioSequenceBatch) IsNil() bool { func (iterator IPairedBioSequenceBatch) IsNil() bool {
return iterator.pointer == nil return iterator.pointer == nil
} }