A first version of obigrep. Normally fully functionnal, but not fully tested

This commit is contained in:
2022-02-25 07:29:52 +01:00
parent 0a36447e2a
commit 011898bd9d
11 changed files with 709 additions and 772 deletions

View File

@@ -1,533 +0,0 @@
package obiseq
import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"github.com/tevino/abool/v2"
)
type BioSequenceBatch struct {
slice BioSequenceSlice
order int
}
var NilBioSequenceBatch = BioSequenceBatch{nil, -1}
func MakeBioSequenceBatch(order int, sequences BioSequenceSlice) BioSequenceBatch {
return BioSequenceBatch{
slice: sequences,
order: order,
}
}
func (batch BioSequenceBatch) Order() int {
return batch.order
}
func (batch BioSequenceBatch) Reorder(newOrder int) BioSequenceBatch {
batch.order = newOrder
return batch
}
func (batch BioSequenceBatch) Slice() BioSequenceSlice {
return batch.slice
}
func (batch BioSequenceBatch) Length() int {
return len(batch.slice)
}
func (batch BioSequenceBatch) NotEmpty() bool {
return batch.slice.NotEmpty()
}
func (batch BioSequenceBatch) Pop0() *BioSequence {
return batch.slice.Pop0()
}
func (batch BioSequenceBatch) IsNil() bool {
return batch.slice == nil
}
func (batch BioSequenceBatch) Recycle() {
batch.slice.Recycle()
batch.slice = nil
}
// Structure implementing an iterator over bioseq.BioSequenceBatch
// based on a channel.
type _IBioSequenceBatch struct {
channel chan BioSequenceBatch
current BioSequenceBatch
pushBack *abool.AtomicBool
all_done *sync.WaitGroup
lock *sync.RWMutex
buffer_size int32
batch_size int32
sequence_format string
finished *abool.AtomicBool
}
type IBioSequenceBatch struct {
pointer *_IBioSequenceBatch
}
var NilIBioSequenceBatch = IBioSequenceBatch{pointer: nil}
func MakeIBioSequenceBatch(sizes ...int) IBioSequenceBatch {
buffsize := int32(1)
if len(sizes) > 0 {
buffsize = int32(sizes[0])
}
i := _IBioSequenceBatch{
channel: make(chan BioSequenceBatch, buffsize),
current: NilBioSequenceBatch,
pushBack: abool.New(),
buffer_size: buffsize,
batch_size: -1,
sequence_format: "",
finished: abool.New(),
}
waiting := sync.WaitGroup{}
i.all_done = &waiting
lock := sync.RWMutex{}
i.lock = &lock
ii := IBioSequenceBatch{&i}
return ii
}
func (iterator IBioSequenceBatch) Add(n int) {
iterator.pointer.all_done.Add(n)
}
func (iterator IBioSequenceBatch) Done() {
iterator.pointer.all_done.Done()
}
func (iterator IBioSequenceBatch) Unlock() {
iterator.pointer.lock.Unlock()
}
func (iterator IBioSequenceBatch) Lock() {
iterator.pointer.lock.Lock()
}
func (iterator IBioSequenceBatch) RLock() {
iterator.pointer.lock.RLock()
}
func (iterator IBioSequenceBatch) RUnlock() {
iterator.pointer.lock.RUnlock()
}
func (iterator IBioSequenceBatch) Wait() {
iterator.pointer.all_done.Wait()
}
func (iterator IBioSequenceBatch) Channel() chan BioSequenceBatch {
return iterator.pointer.channel
}
func (iterator IBioSequenceBatch) IsNil() bool {
return iterator.pointer == nil
}
func (iterator IBioSequenceBatch) BufferSize() int {
return int(atomic.LoadInt32(&iterator.pointer.buffer_size))
}
func (iterator IBioSequenceBatch) BatchSize() int {
return int(atomic.LoadInt32(&iterator.pointer.batch_size))
}
func (iterator IBioSequenceBatch) SetBatchSize(size int) error {
if size >= 0 {
atomic.StoreInt32(&iterator.pointer.batch_size, int32(size))
return nil
}
return fmt.Errorf("size (%d) cannot be negative", size)
}
func (iterator IBioSequenceBatch) Split() IBioSequenceBatch {
iterator.pointer.lock.RLock()
defer iterator.pointer.lock.RUnlock()
i := _IBioSequenceBatch{
channel: iterator.pointer.channel,
current: NilBioSequenceBatch,
pushBack: abool.New(),
all_done: iterator.pointer.all_done,
buffer_size: iterator.pointer.buffer_size,
batch_size: iterator.pointer.batch_size,
sequence_format: iterator.pointer.sequence_format,
finished: iterator.pointer.finished}
lock := sync.RWMutex{}
i.lock = &lock
newIter := IBioSequenceBatch{&i}
return newIter
}
func (iterator IBioSequenceBatch) Next() bool {
if iterator.pointer.pushBack.IsSet() {
iterator.pointer.pushBack.UnSet()
return true
}
if iterator.pointer.finished.IsSet() {
return false
}
next, ok := (<-iterator.pointer.channel)
if ok {
iterator.pointer.current = next
return true
}
iterator.pointer.current = NilBioSequenceBatch
iterator.pointer.finished.Set()
return false
}
func (iterator IBioSequenceBatch) PushBack() {
if !iterator.pointer.current.IsNil() {
iterator.pointer.pushBack.Set()
}
}
// The 'Get' method returns the instance of BioSequenceBatch
// currently pointed by the iterator. You have to use the
// 'Next' method to move to the next entry before calling
// 'Get' to retreive the following instance.
func (iterator IBioSequenceBatch) Get() BioSequenceBatch {
return iterator.pointer.current
}
func (iterator IBioSequenceBatch) Push(batch BioSequenceBatch) {
if batch.IsNil() {
log.Panicln("An Nil batch is pushed on the channel")
}
if batch.Length() == 0 {
log.Panicln("An empty batch is pushed on the channel")
}
iterator.pointer.channel <- batch
}
func (iterator IBioSequenceBatch) Close() {
close(iterator.pointer.channel)
}
func (iterator IBioSequenceBatch) WaitAndClose() {
iterator.Wait()
for len(iterator.Channel()) > 0 {
time.Sleep(time.Millisecond)
}
iterator.Close()
}
// Finished returns 'true' value if no more data is available
// from the iterator.
func (iterator IBioSequenceBatch) Finished() bool {
return iterator.pointer.finished.IsSet()
}
func (iterator IBioSequenceBatch) IBioSequence(sizes ...int) IBioSequence {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIBioSequence(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
go func() {
for iterator.Next() {
batch := iterator.Get()
for batch.NotEmpty() {
newIter.pointer.channel <- batch.Pop0()
}
batch.Recycle()
}
newIter.Done()
}()
return newIter
}
func (iterator IBioSequenceBatch) SortBatches(sizes ...int) IBioSequenceBatch {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIBioSequenceBatch(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
next_to_send := 0
received := make(map[int]BioSequenceBatch)
go func() {
for iterator.Next() {
batch := iterator.Get()
if batch.order == next_to_send {
newIter.pointer.channel <- batch
next_to_send++
batch, ok := received[next_to_send]
for ok {
newIter.pointer.channel <- batch
delete(received, next_to_send)
next_to_send++
batch, ok = received[next_to_send]
}
} else {
received[batch.order] = batch
}
}
newIter.Done()
}()
return newIter
}
func (iterator IBioSequenceBatch) Concat(iterators ...IBioSequenceBatch) IBioSequenceBatch {
if len(iterators) == 0 {
return iterator
}
buffsize := iterator.BufferSize()
newIter := MakeIBioSequenceBatch(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.Channel())
}()
go func() {
previous_max := 0
max_order := 0
for iterator.Next() {
s := iterator.Get()
if s.order > max_order {
max_order = s.order
}
newIter.Push(s.Reorder(s.order + previous_max))
}
previous_max = max_order + 1
for _, iter := range iterators {
for iter.Next() {
s := iter.Get()
if (s.order + previous_max) > max_order {
max_order = s.order + previous_max
}
newIter.Push(s.Reorder(s.order + previous_max))
}
previous_max = max_order + 1
}
newIter.Done()
}()
return newIter
}
// Redistributes sequences from a IBioSequenceBatch into a new
// IBioSequenceBatch with every batches having the same size
// indicated in parameter. Rebatching implies to sort the
// source IBioSequenceBatch.
func (iterator IBioSequenceBatch) Rebatch(size int, sizes ...int) IBioSequenceBatch {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIBioSequenceBatch(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
go func() {
order := 0
iterator = iterator.SortBatches()
buffer := MakeBioSequenceSlice()
for iterator.Next() {
seqs := iterator.Get()
for _, s := range seqs.slice {
buffer = append(buffer, s)
if len(buffer) == size {
newIter.Push(MakeBioSequenceBatch(order, buffer))
order++
buffer = MakeBioSequenceSlice()
}
}
seqs.Recycle()
}
if len(buffer) > 0 {
newIter.Push(MakeBioSequenceBatch(order, buffer))
}
newIter.Done()
}()
return newIter
}
func (iterator IBioSequenceBatch) Recycle() {
log.Println("Start recycling of Bioseq objects")
recycled := 0
for iterator.Next() {
// iterator.Get()
batch := iterator.Get()
for _, seq := range batch.Slice() {
seq.Recycle()
recycled++
}
batch.Recycle()
}
log.Printf("End of the recycling of %d Bioseq objects", recycled)
}
func (iterator IBioSequenceBatch) PairWith(reverse IBioSequenceBatch, sizes ...int) IPairedBioSequenceBatch {
buffsize := iterator.BufferSize()
batchsize := 5000
if len(sizes) > 0 {
batchsize = sizes[0]
}
if len(sizes) > 1 {
buffsize = sizes[1]
}
iterator = iterator.Rebatch(batchsize)
reverse = reverse.Rebatch(batchsize)
newIter := MakeIPairedBioSequenceBatch(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
log.Println("End of association of paired reads")
}()
log.Println("Start association of paired reads")
go func() {
for iterator.Next() {
if !reverse.Next() {
log.Panicln("Etrange reverse pas prêt")
}
newIter.Channel() <- MakePairedBioSequenceBatch(iterator.Get(),
reverse.Get())
}
newIter.Done()
}()
return newIter
}
func (iterator IBioSequenceBatch) DivideOn(predicate SequencePredicate,
size int, sizes ...int) (IBioSequenceBatch, IBioSequenceBatch) {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
trueIter := MakeIBioSequenceBatch(buffsize)
falseIter := MakeIBioSequenceBatch(buffsize)
trueIter.Add(1)
falseIter.Add(1)
go func() {
trueIter.WaitAndClose()
falseIter.WaitAndClose()
}()
go func() {
trueOrder := 0
falseOrder := 0
iterator = iterator.SortBatches()
trueSlice := MakeBioSequenceSlice()
falseSlice := MakeBioSequenceSlice()
for iterator.Next() {
seqs := iterator.Get()
for _, s := range seqs.slice {
if predicate(s) {
trueSlice = append(trueSlice, s)
} else {
falseSlice = append(falseSlice, s)
}
if len(trueSlice) == size {
trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice))
trueOrder++
trueSlice = MakeBioSequenceSlice()
}
if len(falseSlice) == size {
falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice))
falseOrder++
falseSlice = MakeBioSequenceSlice()
}
}
seqs.Recycle()
}
if len(trueSlice) > 0 {
trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice))
}
if len(falseSlice) > 0 {
falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice))
}
trueIter.Done()
falseIter.Done()
}()
return trueIter, falseIter
}

View File

@@ -1,213 +0,0 @@
package obiseq
import (
"log"
"sync"
)
type PairedBioSequenceBatch struct {
forward BioSequenceSlice
reverse BioSequenceSlice
order int
}
var NilPairedBioSequenceBatch = PairedBioSequenceBatch{nil, nil, -1}
func MakePairedBioSequenceBatch(forward, reverse BioSequenceBatch) PairedBioSequenceBatch {
if forward.order != reverse.order {
log.Fatalf("Forward order : %d and reverse order : %d are not matching",
forward.order, reverse.order)
}
for i := range reverse.slice {
reverse.slice[i].ReverseComplement(true)
}
return PairedBioSequenceBatch{
forward: forward.slice,
reverse: reverse.slice,
order: forward.order,
}
}
func (batch PairedBioSequenceBatch) Order() int {
return batch.order
}
func (batch PairedBioSequenceBatch) Length() int {
return len(batch.forward)
}
func (batch PairedBioSequenceBatch) Forward() BioSequenceSlice {
return batch.forward
}
func (batch PairedBioSequenceBatch) Reverse() BioSequenceSlice {
return batch.reverse
}
func (batch PairedBioSequenceBatch) IsNil() bool {
return batch.forward == nil
}
// Structure implementing an iterator over bioseq.BioSequenceBatch
// based on a channel.
type __ipairedbiosequencebatch__ struct {
channel chan PairedBioSequenceBatch
current PairedBioSequenceBatch
pushBack bool
all_done *sync.WaitGroup
buffer_size int
finished bool
p_finished *bool
}
type IPairedBioSequenceBatch struct {
pointer *__ipairedbiosequencebatch__
}
var NilIPairedBioSequenceBatch = IPairedBioSequenceBatch{pointer: nil}
func MakeIPairedBioSequenceBatch(sizes ...int) IPairedBioSequenceBatch {
buffsize := 1
if len(sizes) > 0 {
buffsize = sizes[0]
}
i := __ipairedbiosequencebatch__{
channel: make(chan PairedBioSequenceBatch, buffsize),
current: NilPairedBioSequenceBatch,
pushBack: false,
buffer_size: buffsize,
finished: false,
p_finished: nil,
}
i.p_finished = &i.finished
waiting := sync.WaitGroup{}
i.all_done = &waiting
ii := IPairedBioSequenceBatch{&i}
return ii
}
func (iterator IPairedBioSequenceBatch) Add(n int) {
iterator.pointer.all_done.Add(n)
}
func (iterator IPairedBioSequenceBatch) Done() {
iterator.pointer.all_done.Done()
}
func (iterator IPairedBioSequenceBatch) Wait() {
iterator.pointer.all_done.Wait()
}
func (iterator IPairedBioSequenceBatch) Channel() chan PairedBioSequenceBatch {
return iterator.pointer.channel
}
func (iterator IPairedBioSequenceBatch) IsNil() bool {
return iterator.pointer == nil
}
func (iterator IPairedBioSequenceBatch) BufferSize() int {
return iterator.pointer.buffer_size
}
func (iterator IPairedBioSequenceBatch) Split() IPairedBioSequenceBatch {
i := __ipairedbiosequencebatch__{
channel: iterator.pointer.channel,
current: NilPairedBioSequenceBatch,
pushBack: false,
all_done: iterator.pointer.all_done,
buffer_size: iterator.pointer.buffer_size,
finished: false,
p_finished: iterator.pointer.p_finished}
newIter := IPairedBioSequenceBatch{&i}
return newIter
}
func (iterator IPairedBioSequenceBatch) Next() bool {
if *(iterator.pointer.p_finished) {
return false
}
if iterator.pointer.pushBack {
iterator.pointer.pushBack = false
return true
}
next, ok := (<-iterator.pointer.channel)
if ok {
iterator.pointer.current = next
return true
}
iterator.pointer.current = NilPairedBioSequenceBatch
*iterator.pointer.p_finished = true
return false
}
func (iterator IPairedBioSequenceBatch) PushBack() {
if !iterator.pointer.current.IsNil() {
iterator.pointer.pushBack = true
}
}
// The 'Get' method returns the instance of BioSequenceBatch
// currently pointed by the iterator. You have to use the
// 'Next' method to move to the next entry before calling
// 'Get' to retreive the following instance.
func (iterator IPairedBioSequenceBatch) Get() PairedBioSequenceBatch {
return iterator.pointer.current
}
// Finished returns 'true' value if no more data is available
// from the iterator.
func (iterator IPairedBioSequenceBatch) Finished() bool {
return *iterator.pointer.p_finished
}
func (iterator IPairedBioSequenceBatch) SortBatches(sizes ...int) IPairedBioSequenceBatch {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIPairedBioSequenceBatch(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
next_to_send := 0
received := make(map[int]PairedBioSequenceBatch)
go func() {
for iterator.Next() {
batch := iterator.Get()
if batch.order == next_to_send {
newIter.pointer.channel <- batch
next_to_send++
batch, ok := received[next_to_send]
for ok {
newIter.pointer.channel <- batch
delete(received, next_to_send)
next_to_send++
batch, ok = received[next_to_send]
}
} else {
received[batch.order] = batch
}
}
newIter.Done()
}()
return newIter
}

View File

@@ -2,6 +2,9 @@ package obiseq
import (
"context"
"fmt"
"regexp"
log "github.com/sirupsen/logrus"
"github.com/PaesslerAG/gval"
@@ -10,37 +13,55 @@ import (
type SequencePredicate func(*BioSequence) bool
func (predicate1 SequencePredicate) And(predicate2 SequencePredicate) SequencePredicate {
f := func(sequence *BioSequence) bool {
return predicate1(sequence) && predicate2(sequence)
switch {
case predicate1 == nil:
return predicate2
case predicate2 == nil:
return predicate1
default:
return func(sequence *BioSequence) bool {
return predicate1(sequence) && predicate2(sequence)
}
}
return f
}
func (predicate1 SequencePredicate) Or(predicate2 SequencePredicate) SequencePredicate {
f := func(sequence *BioSequence) bool {
return predicate1(sequence) || predicate2(sequence)
switch {
case predicate1 == nil:
return predicate2
case predicate2 == nil:
return predicate1
default:
return func(sequence *BioSequence) bool {
return predicate1(sequence) || predicate2(sequence)
}
}
return f
}
func (predicate1 SequencePredicate) Xor(predicate2 SequencePredicate) SequencePredicate {
f := func(sequence *BioSequence) bool {
p1 := predicate1(sequence)
p2 := predicate2(sequence)
return (p1 && !p2) || (p2 && !p1)
switch {
case predicate1 == nil:
return predicate2
case predicate2 == nil:
return predicate1
default:
return func(sequence *BioSequence) bool {
p1 := predicate1(sequence)
p2 := predicate2(sequence)
return (p1 && !p2) || (p2 && !p1)
}
}
return f
}
func (predicate1 SequencePredicate) Not() SequencePredicate {
f := func(sequence *BioSequence) bool {
return !predicate1(sequence)
switch {
case predicate1 == nil:
return nil
default:
return func(sequence *BioSequence) bool {
return !predicate1(sequence)
}
}
return f
}
func HasAttribute(name string) SequencePredicate {
@@ -57,9 +78,43 @@ func HasAttribute(name string) SequencePredicate {
return f
}
func MoreAbundantThan(count int) SequencePredicate {
func IsAttributeMatch(name string, pattern string) SequencePredicate {
pat, err := regexp.Compile(pattern)
if err != nil {
log.Fatalf("error in atribute %s regular pattern syntax : %v", name, err)
}
f := func(sequence *BioSequence) bool {
return sequence.Count() > count
if sequence.HasAnnotation() {
val, ok := (sequence.Annotations())[name]
if ok {
switch val := val.(type) {
case string:
return pat.MatchString(val)
default:
return pat.MatchString(fmt.Sprint(val))
}
}
}
return false
}
return f
}
func IsMoreAbundantOrEqualTo(count int) SequencePredicate {
f := func(sequence *BioSequence) bool {
return sequence.Count() >= count
}
return f
}
func IsLessAbundantOrEqualTo(count int) SequencePredicate {
f := func(sequence *BioSequence) bool {
return sequence.Count() <= count
}
return f
@@ -81,7 +136,64 @@ func IsShorterOrEqualTo(length int) SequencePredicate {
return f
}
func ExrpessionPredicat(expression string) SequencePredicate {
func IsSequenceMatch(pattern string) SequencePredicate {
pat, err := regexp.Compile("(?i)" + pattern)
if err != nil {
log.Fatalf("error in sequence regular pattern syntax : %v", err)
}
f := func(sequence *BioSequence) bool {
return pat.Match(sequence.Sequence())
}
return f
}
func IsDefinitionMatch(pattern string) SequencePredicate {
pat, err := regexp.Compile(pattern)
if err != nil {
log.Fatalf("error in definition regular pattern syntax : %v", err)
}
f := func(sequence *BioSequence) bool {
return pat.MatchString(sequence.Definition())
}
return f
}
func IsIdMatch(pattern string) SequencePredicate {
pat, err := regexp.Compile(pattern)
if err != nil {
log.Fatalf("error in identifier regular pattern syntax : %v", err)
}
f := func(sequence *BioSequence) bool {
return pat.MatchString(sequence.Id())
}
return f
}
func IsIdIn(ids ...string) SequencePredicate {
idset := make(map[string]bool)
for _, v := range ids {
idset[v] = true
}
f := func(sequence *BioSequence) bool {
_, ok := idset[sequence.Id()]
return ok
}
return f
}
func ExpressionPredicat(expression string) SequencePredicate {
exp, err := gval.Full().NewEvaluable(expression)

View File

@@ -1,2 +0,0 @@
package obiseq