change the model for representing paired reads and extend its usage to other commands

This commit is contained in:
2023-02-23 23:35:58 +01:00
parent ebb05fcdf7
commit 072b85e155
23 changed files with 598 additions and 338 deletions

View File

@ -46,6 +46,7 @@ type _IBioSequence struct {
batch_size int32
sequence_format string
finished *abool.AtomicBool
paired bool
}
type IBioSequence struct {
@ -73,6 +74,7 @@ func MakeIBioSequence(sizes ...int) IBioSequence {
batch_size: -1,
sequence_format: "",
finished: abool.New(),
paired: false,
}
waiting := sync.WaitGroup{}
@ -199,6 +201,11 @@ func (iterator IBioSequence) Split() IBioSequence {
i.lock = &lock
newIter := IBioSequence{&i}
if iterator.IsPaired() {
newIter.MarkAsPaired()
}
return newIter
}
@ -270,6 +277,7 @@ func (iterator IBioSequence) Finished() bool {
return iterator.pointer.finished.IsSet()
}
// Sorting the batches of sequences.
func (iterator IBioSequence) SortBatches(sizes ...int) IBioSequence {
buffsize := iterator.BufferSize()
@ -311,6 +319,10 @@ func (iterator IBioSequence) SortBatches(sizes ...int) IBioSequence {
newIter.Done()
}()
if iterator.IsPaired() {
newIter.MarkAsPaired()
}
return newIter
}
@ -321,6 +333,11 @@ func (iterator IBioSequence) Concat(iterators ...IBioSequence) IBioSequence {
return iterator
}
allPaired := iterator.IsPaired()
for _, i := range iterators {
allPaired = allPaired && i.IsPaired()
}
buffsize := iterator.BufferSize()
newIter := MakeIBioSequence(buffsize)
@ -357,6 +374,10 @@ func (iterator IBioSequence) Concat(iterators ...IBioSequence) IBioSequence {
newIter.Done()
}()
if allPaired {
newIter.MarkAsPaired()
}
return newIter
}
@ -368,6 +389,12 @@ func (iterator IBioSequence) Pool(iterators ...IBioSequence) IBioSequence {
return iterator
}
allPaired := iterator.IsPaired()
for _, i := range iterators {
allPaired = allPaired && i.IsPaired()
}
nextCounter := goutils.AtomicCounter()
buffsize := iterator.BufferSize()
newIter := MakeIBioSequence(buffsize)
@ -392,6 +419,10 @@ func (iterator IBioSequence) Pool(iterators ...IBioSequence) IBioSequence {
go ff(i)
}
if allPaired {
newIter.MarkAsPaired()
}
return newIter
}
@ -441,6 +472,10 @@ func (iterator IBioSequence) Rebatch(size int, sizes ...int) IBioSequence {
}()
if iterator.IsPaired() {
newIter.MarkAsPaired()
}
return newIter
}
@ -492,47 +527,6 @@ func (iterator IBioSequence) Count(recycle bool) (int, int, int) {
return variants, reads, nucleotides
}
func (iterator IBioSequence) PairWith(reverse IBioSequence,
sizes ...int) IPairedBioSequenceBatch {
buffsize := iterator.BufferSize()
batchsize := 5000
if len(sizes) > 0 {
batchsize = sizes[0]
}
if len(sizes) > 1 {
buffsize = sizes[1]
}
iterator = iterator.Rebatch(batchsize)
reverse = reverse.Rebatch(batchsize)
newIter := MakeIPairedBioSequenceBatch(buffsize)
newIter.Add(1)
go func() {
newIter.WaitAndClose()
log.Println("End of association of paired reads")
}()
log.Println("Start association of paired reads")
go func() {
for iterator.Next() {
if !reverse.Next() {
log.Panicln("Etrange reverse pas prêt")
}
newIter.Channel() <- MakePairedBioSequenceBatch(iterator.Get(),
reverse.Get())
}
newIter.Done()
}()
return newIter
}
// A function that takes a predicate and returns two IBioSequenceBatch iterators.
// Sequences extracted from the input iterator are distributed among both the
// iterator following the predicate value.
@ -599,6 +593,10 @@ func (iterator IBioSequence) DivideOn(predicate obiseq.SequencePredicate,
falseIter.Done()
}()
if iterator.IsPaired() {
trueIter.MarkAsPaired()
falseIter.MarkAsPaired()
}
return trueIter, falseIter
}
@ -654,6 +652,71 @@ func (iterator IBioSequence) FilterOn(predicate obiseq.SequencePredicate,
go ff(iterator)
if iterator.IsPaired() {
trueIter.MarkAsPaired()
}
return trueIter.Rebatch(size)
}
func (iterator IBioSequence) FilterAnd(predicate obiseq.SequencePredicate,
size int, sizes ...int) IBioSequence {
buffsize := iterator.BufferSize()
nworkers := 4
if len(sizes) > 0 {
nworkers = sizes[0]
}
if len(sizes) > 1 {
buffsize = sizes[1]
}
trueIter := MakeIBioSequence(buffsize)
trueIter.Add(nworkers)
go func() {
trueIter.WaitAndClose()
}()
ff := func(iterator IBioSequence) {
// iterator = iterator.SortBatches()
for iterator.Next() {
seqs := iterator.Get()
slice := seqs.slice
j := 0
for _, s := range slice {
good := predicate(s)
if s.IsPaired() {
good = good && predicate(s.PairedWith())
}
if good {
slice[j] = s
j++
} else {
s.Recycle()
}
}
seqs.slice = slice[:j]
trueIter.pointer.channel <- seqs
}
trueIter.Done()
}
for i := 1; i < nworkers; i++ {
go ff(iterator.Split())
}
go ff(iterator)
if iterator.IsPaired() {
trueIter.MarkAsPaired()
}
return trueIter.Rebatch(size)
}
@ -673,13 +736,14 @@ func (iterator IBioSequence) Load() obiseq.BioSequenceSlice {
// It takes a slice of BioSequence objects, and returns an iterator that will return batches of
// BioSequence objects
func IBatchOver(data obiseq.BioSequenceSlice,
size int, sizes ...int) IBioSequence {
buffsize := 0
if len(sizes) > 0 {
buffsize = sizes[1]
buffsize = sizes[0]
}
newIter := MakeIBioSequence(buffsize)
@ -706,5 +770,8 @@ func IBatchOver(data obiseq.BioSequenceSlice,
newIter.Done()
}()
if data.IsPaired() {
newIter.MarkAsPaired()
}
return newIter
}