first blackboard

Former-commit-id: d7f45aa3671e18e7dbac7ef3844856241900b1ad
This commit is contained in:
Eric Coissac
2024-07-25 18:10:28 -04:00
parent 67665a6b40
commit cba355cdde
4 changed files with 216 additions and 28 deletions

View File

@ -1,38 +1,46 @@
package main
import (
"os"
"fmt"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiblackboard"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
)
func MakeCounter(n int) func(*obiblackboard.Task) *obiblackboard.Task {
count := obiutils.AtomicCounter()
r1 := func(task *obiblackboard.Task) *obiblackboard.Task {
val := count()
if val < n {
nt := task.GetNext()
nt.Body = val
return nt
}
return nil
}
return r1
}
func r2(task *obiblackboard.Task) *obiblackboard.Task {
fmt.Printf("value : %v\n", task.Body)
return obiblackboard.NewInitialTask()
}
func rmul(task *obiblackboard.Task) *obiblackboard.Task {
nt := task.GetNext()
nt.Body = task.Body.(int) * 2
return nt
}
func main() {
optionParser := obioptions.GenerateOptionParser(obiconvert.OptionSet)
_, args := optionParser(os.Args)
black := obiblackboard.NewBlackBoard(20)
fs, err := obiconvert.CLIReadBioSequences(args...)
if err != nil {
log.Errorf("Cannot open file (%v)", err)
os.Exit(1)
}
frags := obiiter.IFragments(
1000,
100,
10,
100,
obioptions.CLIParallelWorkers(),
)
obiconvert.CLIWriteBioSequences(fs.Pipe(frags), true)
obiiter.WaitForLastPipe()
black.RegisterRunner("todisplay", "initial", r2)
black.RegisterRunner("multiply", "todisplay", rmul)
black.RegisterRunner("initial", "multiply", MakeCounter(1000))
black.Run()
}

View File

@ -0,0 +1,151 @@
package obiblackboard
import (
"slices"
"sync"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
)
type Runner struct {
Run func(*Task) *Task
To string
}
type Blackboard struct {
Board map[int]Queue
BoardLock *sync.Mutex
Runners map[string]Runner
Running *obiutils.Counter
Size int
}
func NewBlackBoard(size int) *Blackboard {
board := make(map[int]Queue, 0)
runners := make(map[string]Runner, 0)
if size < 2 {
size = 2
}
bb := &Blackboard{
Board: board,
BoardLock: &sync.Mutex{},
Runners: runners,
Running: obiutils.NewCounter(),
Size: size,
}
for i := 0; i < size; i++ {
bb.PushTask(NewInitialTask())
}
return bb
}
func (bb *Blackboard) RegisterRunner(from, to string, runner func(*Task) *Task) {
bb.Runners[from] = Runner{
Run: runner,
To: to}
}
func (bb *Blackboard) MaxQueue() Queue {
max_priority := -1
max_queue := Queue(nil)
for priority, queue := range bb.Board {
if priority > max_priority {
max_queue = queue
}
}
return max_queue
}
func (bb *Blackboard) PopTask() *Task {
bb.BoardLock.Lock()
defer bb.BoardLock.Unlock()
q := bb.MaxQueue()
if q != nil {
next_task := (*q)[0]
(*q) = (*q)[1:]
if len(*q) == 0 {
delete(bb.Board, next_task.Priority)
}
return next_task
}
return (*Task)(nil)
}
func (bb *Blackboard) PushTask(task *Task) {
bb.BoardLock.Lock()
defer bb.BoardLock.Unlock()
if task != nil {
priority := task.Priority
queue, ok := bb.Board[priority]
if !ok {
queue = NewQueue()
bb.Board[priority] = queue
}
*queue = slices.Grow(*queue, 1)
*queue = append((*queue), task)
}
}
func (bb *Blackboard) Run() {
ctask := make(chan *Task)
lock := &sync.WaitGroup{}
launcher := func() {
for task := range ctask {
runner, ok := bb.Runners[task.Role]
if ok {
task = runner.Run(task)
if task != nil {
task.Role = runner.To
}
}
bb.PushTask(task)
bb.Running.Dec()
}
lock.Done()
}
parallel := bb.Size - 1
lock.Add(parallel)
for i := 0; i < parallel; i++ {
go launcher()
}
go func() {
for {
bb.Running.Inc()
task := bb.PopTask()
if task != nil {
ctask <- task
} else {
bb.Running.Dec()
if bb.Running.Value() <= 0 {
break
}
}
}
close(ctask)
}()
lock.Wait()
}

View File

@ -0,0 +1,8 @@
package obiblackboard
type Queue *[]*Task
func NewQueue() Queue {
q := make([]*Task, 0)
return &q
}

21
pkg/obiblackboard/task.go Normal file
View File

@ -0,0 +1,21 @@
package obiblackboard
type Task struct {
Role string
Priority int
Body interface{}
}
func NewInitialTask() *Task {
return &Task{
Role: "initial",
Priority: 0,
Body: nil,
}
}
func (task *Task) GetNext() *Task {
t := NewInitialTask()
t.Priority = task.Priority + 1
return t
}