Adds a first version of a new obidistribute command

This commit is contained in:
2022-02-14 00:01:01 +01:00
parent 1544bafde1
commit eb32620bb3
13 changed files with 567 additions and 36 deletions

View File

@@ -1,8 +1,12 @@
package obiseq
import (
"fmt"
"log"
"sync"
"sync/atomic"
"github.com/tevino/abool/v2"
)
type BioSequenceBatch struct {
@@ -36,40 +40,45 @@ func (batch BioSequenceBatch) IsNil() bool {
// Structure implementing an iterator over bioseq.BioSequenceBatch
// based on a channel.
type __ibiosequencebatch__ struct {
channel chan BioSequenceBatch
current BioSequenceBatch
pushBack bool
all_done *sync.WaitGroup
buffer_size int
finished bool
p_finished *bool
type _IBioSequenceBatch struct {
channel chan BioSequenceBatch
current BioSequenceBatch
pushBack *abool.AtomicBool
all_done *sync.WaitGroup
lock *sync.RWMutex
buffer_size int32
batch_size int32
sequence_format string
finished *abool.AtomicBool
}
type IBioSequenceBatch struct {
pointer *__ibiosequencebatch__
pointer *_IBioSequenceBatch
}
var NilIBioSequenceBatch = IBioSequenceBatch{pointer: nil}
func MakeIBioSequenceBatch(sizes ...int) IBioSequenceBatch {
buffsize := 1
buffsize := int32(1)
if len(sizes) > 0 {
buffsize = sizes[0]
buffsize = int32(sizes[0])
}
i := __ibiosequencebatch__{
channel: make(chan BioSequenceBatch, buffsize),
current: NilBioSequenceBatch,
pushBack: false,
buffer_size: buffsize,
finished: false,
p_finished: nil,
i := _IBioSequenceBatch{
channel: make(chan BioSequenceBatch, buffsize),
current: NilBioSequenceBatch,
pushBack: abool.New(),
buffer_size: buffsize,
batch_size: -1,
sequence_format: "",
finished: abool.New(),
}
i.p_finished = &i.finished
waiting := sync.WaitGroup{}
i.all_done = &waiting
lock := sync.RWMutex{}
i.lock = &lock
ii := IBioSequenceBatch{&i}
return ii
}
@@ -82,6 +91,22 @@ func (iterator IBioSequenceBatch) Done() {
iterator.pointer.all_done.Done()
}
func (iterator IBioSequenceBatch) Unlock() {
iterator.pointer.lock.Unlock()
}
func (iterator IBioSequenceBatch) Lock() {
iterator.pointer.lock.Lock()
}
func (iterator IBioSequenceBatch) RLock() {
iterator.pointer.lock.RLock()
}
func (iterator IBioSequenceBatch) RUnlock() {
iterator.pointer.lock.RUnlock()
}
func (iterator IBioSequenceBatch) Wait() {
iterator.pointer.all_done.Wait()
}
@@ -95,29 +120,48 @@ func (iterator IBioSequenceBatch) IsNil() bool {
}
func (iterator IBioSequenceBatch) BufferSize() int {
return iterator.pointer.buffer_size
return int(atomic.LoadInt32(&iterator.pointer.buffer_size))
}
func (iterator IBioSequenceBatch) BatchSize() int {
return int(atomic.LoadInt32(&iterator.pointer.batch_size))
}
func (iterator IBioSequenceBatch) SetBatchSize(size int) error {
if size >= 0 {
atomic.StoreInt32(&iterator.pointer.batch_size, int32(size))
return nil
}
return fmt.Errorf("size (%d) cannot be negative", size)
}
func (iterator IBioSequenceBatch) Split() IBioSequenceBatch {
i := __ibiosequencebatch__{
channel: iterator.pointer.channel,
current: NilBioSequenceBatch,
pushBack: false,
all_done: iterator.pointer.all_done,
buffer_size: iterator.pointer.buffer_size,
finished: false,
p_finished: iterator.pointer.p_finished}
iterator.pointer.lock.RLock()
defer iterator.pointer.lock.RUnlock()
i := _IBioSequenceBatch{
channel: iterator.pointer.channel,
current: NilBioSequenceBatch,
pushBack: abool.New(),
all_done: iterator.pointer.all_done,
buffer_size: iterator.pointer.buffer_size,
batch_size: iterator.pointer.batch_size,
sequence_format: iterator.pointer.sequence_format,
finished: iterator.pointer.finished}
lock := sync.RWMutex{}
i.lock = &lock
newIter := IBioSequenceBatch{&i}
return newIter
}
func (iterator IBioSequenceBatch) Next() bool {
if *(iterator.pointer.p_finished) {
if iterator.pointer.finished.IsSet() {
return false
}
if iterator.pointer.pushBack {
iterator.pointer.pushBack = false
if iterator.pointer.pushBack.IsSet() {
iterator.pointer.pushBack.UnSet()
return true
}
@@ -129,13 +173,13 @@ func (iterator IBioSequenceBatch) Next() bool {
}
iterator.pointer.current = NilBioSequenceBatch
*iterator.pointer.p_finished = true
iterator.pointer.finished.Set()
return false
}
func (iterator IBioSequenceBatch) PushBack() {
if !iterator.pointer.current.IsNil() {
iterator.pointer.pushBack = true
iterator.pointer.pushBack.Set()
}
}
@@ -150,7 +194,7 @@ func (iterator IBioSequenceBatch) Get() BioSequenceBatch {
// Finished returns 'true' value if no more data is available
// from the iterator.
func (iterator IBioSequenceBatch) Finished() bool {
return *iterator.pointer.p_finished
return iterator.pointer.finished.IsSet()
}
func (iterator IBioSequenceBatch) IBioSequence(sizes ...int) IBioSequence {
@@ -378,7 +422,7 @@ func (iterator IBioSequenceBatch) DivideOn(predicate SequencePredicate,
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[1]
buffsize = sizes[0]
}
trueIter := MakeIBioSequenceBatch(buffsize)

65
pkg/obiseq/class.go Normal file
View File

@@ -0,0 +1,65 @@
package obiseq
import (
"fmt"
"hash/crc32"
"strconv"
)
type SequenceClassifier func(sequence BioSequence) string
func AnnotationClassifier(key string) SequenceClassifier {
f := func(sequence BioSequence) string {
if sequence.HasAnnotation() {
value, ok := sequence.Annotations()[key]
if ok {
switch value := value.(type) {
case string:
return value
default:
return fmt.Sprint(value)
}
}
}
return ""
}
return SequenceClassifier(f)
}
var SampleClassifier = AnnotationClassifier("sample")
func PredicateClassifier(predicate SequencePredicate) SequenceClassifier {
f := func(sequence BioSequence) string {
if predicate(sequence) {
return "true"
} else {
return "false"
}
}
return SequenceClassifier(f)
}
// Builds a classifier function based on CRC32 of the sequence
//
func HashClassifier(size int) SequenceClassifier {
f := func(sequence BioSequence) string {
h := crc32.ChecksumIEEE(sequence.Sequence()) % uint32(size)
return strconv.Itoa(int(h))
}
return SequenceClassifier(f)
}
func RotateClassifier(size int) SequenceClassifier {
n := 0
f := func(sequence BioSequence) string {
h := n % size
n++
return strconv.Itoa(int(h))
}
return SequenceClassifier(f)
}

107
pkg/obiseq/distribute.go Normal file
View File

@@ -0,0 +1,107 @@
package obiseq
import (
"fmt"
"sync"
)
type IDistribute struct {
outputs map[string]IBioSequenceBatch
news chan string
lock *sync.Mutex
}
func (dist *IDistribute) Outputs(key string) (IBioSequenceBatch, error) {
dist.lock.Lock()
iter, ok := dist.outputs[key]
dist.lock.Unlock()
if !ok {
return NilIBioSequenceBatch, fmt.Errorf("key %s unknown", key)
}
return iter, nil
}
func (dist *IDistribute) News() chan string {
return dist.news
}
func (iterator IBioSequenceBatch) Distribute(class SequenceClassifier, sizes ...int) IDistribute {
batchsize := 5000
buffsize := 2
outputs := make(map[string]IBioSequenceBatch, 100)
slices := make(map[string]*BioSequenceSlice, 100)
orders := make(map[string]int, 100)
news := make(chan string)
if len(sizes) > 0 {
batchsize = sizes[0]
}
if len(sizes) > 1 {
buffsize = sizes[1]
}
jobDone := sync.WaitGroup{}
lock := sync.Mutex{}
jobDone.Add(1)
go func() {
jobDone.Wait()
close(news)
for _, i := range outputs {
close(i.Channel())
}
}()
go func() {
iterator = iterator.SortBatches()
for iterator.Next() {
seqs := iterator.Get()
for _, s := range seqs.slice {
key := class(s)
slice, ok := slices[key]
if !ok {
s := make(BioSequenceSlice, 0, batchsize)
slice = &s
slices[key] = slice
orders[key] = 0
lock.Lock()
outputs[key] = MakeIBioSequenceBatch(batchsize, buffsize)
lock.Unlock()
news <- key
}
*slice = append(*slice, s)
if len(*slice) == batchsize {
outputs[key].Channel() <- MakeBioSequenceBatch(orders[key], *slice...)
orders[key]++
s := make(BioSequenceSlice, 0, batchsize)
slices[key] = &s
}
}
}
for key, slice := range slices {
if len(*slice) > 0 {
outputs[key].Channel() <- MakeBioSequenceBatch(orders[key], *slice...)
}
}
jobDone.Done()
}()
return IDistribute{
outputs,
news,
&lock}
}

116
pkg/obiseq/merge.go Normal file
View File

@@ -0,0 +1,116 @@
package obiseq
import (
"fmt"
"log"
)
type StatsOnValues map[string]int
func (sequence BioSequence) HasStatsOn(key string) bool {
if !sequence.HasAnnotation() {
return false
}
mkey := "merged_" + key
annotations := sequence.Annotations()
_, ok := annotations[mkey]
return ok
}
func (sequence BioSequence) StatsOn(key string) StatsOnValues {
mkey := "merged_" + key
annotations := sequence.Annotations()
istat, ok := annotations[mkey]
var stats StatsOnValues
var newstat bool
if ok {
switch istat := istat.(type) {
case StatsOnValues:
stats = istat
newstat = false
default:
stats = make(StatsOnValues, 100)
annotations[mkey] = stats
newstat = true
}
} else {
stats = make(StatsOnValues, 100)
annotations[mkey] = stats
newstat = true
}
if newstat && sequence.StatsPlusOne(key, sequence) {
delete(sequence.Annotations(), key)
}
return stats
}
func (sequence BioSequence) StatsPlusOne(key string, toAdd BioSequence) bool {
if toAdd.HasAnnotation() {
stats := sequence.StatsOn(key)
value, ok := toAdd.Annotations()[key]
if ok {
var sval string
switch value := value.(type) {
case string:
sval = value
case int,
uint8, uint16, uint32, uint64,
int8, int16, int32, int64, bool:
sval = fmt.Sprint(value)
default:
log.Fatalf("Trying to make stats on a none string, integer or boolean value (%v)", value)
}
old, ok := stats[sval]
if !ok {
old = 0
}
stats[sval] = old + 1
return true
}
}
return false
}
func (stats StatsOnValues) Merge(toMerged StatsOnValues) StatsOnValues {
for k, val := range toMerged {
old, ok := stats[k]
if !ok {
old = 0
}
stats[k] = old + val
}
return stats
}
func (sequence BioSequence) Merge(tomerge BioSequence, inplace bool, keys ...string) BioSequence {
if !inplace {
sequence = sequence.Copy()
}
annotation := sequence.Annotations()
annotation["count"] = tomerge.Count() + sequence.Count()
for _, key := range keys {
if tomerge.HasStatsOn(key) {
smk := sequence.StatsOn(key)
mmk := tomerge.StatsOn(key)
smk.Merge(mmk)
} else {
sequence.StatsPlusOne(key, tomerge)
}
}
return sequence
}