From cba355cddee2d48b451629cf3d520a84781402e0 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Thu, 25 Jul 2024 18:10:28 -0400 Subject: [PATCH] first blackboard Former-commit-id: d7f45aa3671e18e7dbac7ef3844856241900b1ad --- cmd/test/main.go | 64 ++++++++------ pkg/obiblackboard/blackboard.go | 151 ++++++++++++++++++++++++++++++++ pkg/obiblackboard/queue.go | 8 ++ pkg/obiblackboard/task.go | 21 +++++ 4 files changed, 216 insertions(+), 28 deletions(-) create mode 100644 pkg/obiblackboard/blackboard.go create mode 100644 pkg/obiblackboard/queue.go create mode 100644 pkg/obiblackboard/task.go diff --git a/cmd/test/main.go b/cmd/test/main.go index 219c713..2f1bb1a 100644 --- a/cmd/test/main.go +++ b/cmd/test/main.go @@ -1,38 +1,46 @@ package main import ( - "os" + "fmt" - log "github.com/sirupsen/logrus" - - "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" - "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert" - - "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiblackboard" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) -func main() { - optionParser := obioptions.GenerateOptionParser(obiconvert.OptionSet) +func MakeCounter(n int) func(*obiblackboard.Task) *obiblackboard.Task { + count := obiutils.AtomicCounter() - _, args := optionParser(os.Args) - - fs, err := obiconvert.CLIReadBioSequences(args...) - - if err != nil { - log.Errorf("Cannot open file (%v)", err) - os.Exit(1) + r1 := func(task *obiblackboard.Task) *obiblackboard.Task { + val := count() + if val < n { + nt := task.GetNext() + nt.Body = val + return nt + } + return nil } - frags := obiiter.IFragments( - 1000, - 100, - 10, - 100, - obioptions.CLIParallelWorkers(), - ) - - obiconvert.CLIWriteBioSequences(fs.Pipe(frags), true) - - obiiter.WaitForLastPipe() - + return r1 +} + +func r2(task *obiblackboard.Task) *obiblackboard.Task { + fmt.Printf("value : %v\n", task.Body) + return obiblackboard.NewInitialTask() +} + +func rmul(task *obiblackboard.Task) *obiblackboard.Task { + nt := task.GetNext() + nt.Body = task.Body.(int) * 2 + return nt +} + +func main() { + + black := obiblackboard.NewBlackBoard(20) + + black.RegisterRunner("todisplay", "initial", r2) + black.RegisterRunner("multiply", "todisplay", rmul) + black.RegisterRunner("initial", "multiply", MakeCounter(1000)) + + black.Run() } diff --git a/pkg/obiblackboard/blackboard.go b/pkg/obiblackboard/blackboard.go new file mode 100644 index 0000000..b591ca7 --- /dev/null +++ b/pkg/obiblackboard/blackboard.go @@ -0,0 +1,151 @@ +package obiblackboard + +import ( + "slices" + "sync" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" +) + +type Runner struct { + Run func(*Task) *Task + To string +} + +type Blackboard struct { + Board map[int]Queue + BoardLock *sync.Mutex + Runners map[string]Runner + Running *obiutils.Counter + Size int +} + +func NewBlackBoard(size int) *Blackboard { + board := make(map[int]Queue, 0) + runners := make(map[string]Runner, 0) + + if size < 2 { + size = 2 + } + + bb := &Blackboard{ + Board: board, + BoardLock: &sync.Mutex{}, + Runners: runners, + Running: obiutils.NewCounter(), + Size: size, + } + + for i := 0; i < size; i++ { + bb.PushTask(NewInitialTask()) + } + + return bb +} + +func (bb *Blackboard) RegisterRunner(from, to string, runner func(*Task) *Task) { + bb.Runners[from] = Runner{ + Run: runner, + To: to} +} + +func (bb *Blackboard) MaxQueue() Queue { + max_priority := -1 + max_queue := Queue(nil) + for priority, queue := range bb.Board { + if priority > max_priority { + max_queue = queue + } + } + + return max_queue +} + +func (bb *Blackboard) PopTask() *Task { + bb.BoardLock.Lock() + defer bb.BoardLock.Unlock() + + q := bb.MaxQueue() + + if q != nil { + next_task := (*q)[0] + (*q) = (*q)[1:] + if len(*q) == 0 { + delete(bb.Board, next_task.Priority) + } + return next_task + } + + return (*Task)(nil) +} + +func (bb *Blackboard) PushTask(task *Task) { + bb.BoardLock.Lock() + defer bb.BoardLock.Unlock() + + if task != nil { + priority := task.Priority + queue, ok := bb.Board[priority] + + if !ok { + queue = NewQueue() + bb.Board[priority] = queue + } + + *queue = slices.Grow(*queue, 1) + *queue = append((*queue), task) + } +} + +func (bb *Blackboard) Run() { + + ctask := make(chan *Task) + lock := &sync.WaitGroup{} + + launcher := func() { + for task := range ctask { + runner, ok := bb.Runners[task.Role] + + if ok { + task = runner.Run(task) + if task != nil { + task.Role = runner.To + } + } + + bb.PushTask(task) + bb.Running.Dec() + } + + lock.Done() + } + + parallel := bb.Size - 1 + lock.Add(parallel) + + for i := 0; i < parallel; i++ { + go launcher() + } + + go func() { + + for { + bb.Running.Inc() + task := bb.PopTask() + + if task != nil { + ctask <- task + } else { + bb.Running.Dec() + if bb.Running.Value() <= 0 { + break + } + } + + } + + close(ctask) + }() + + lock.Wait() +} diff --git a/pkg/obiblackboard/queue.go b/pkg/obiblackboard/queue.go new file mode 100644 index 0000000..831312f --- /dev/null +++ b/pkg/obiblackboard/queue.go @@ -0,0 +1,8 @@ +package obiblackboard + +type Queue *[]*Task + +func NewQueue() Queue { + q := make([]*Task, 0) + return &q +} diff --git a/pkg/obiblackboard/task.go b/pkg/obiblackboard/task.go new file mode 100644 index 0000000..eca9c61 --- /dev/null +++ b/pkg/obiblackboard/task.go @@ -0,0 +1,21 @@ +package obiblackboard + +type Task struct { + Role string + Priority int + Body interface{} +} + +func NewInitialTask() *Task { + return &Task{ + Role: "initial", + Priority: 0, + Body: nil, + } +} + +func (task *Task) GetNext() *Task { + t := NewInitialTask() + t.Priority = task.Priority + 1 + return t +}