From dfe2fc3d438cd66588c9ec057bce3b26fb52f246 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 29 Jul 2024 11:25:54 +0200 Subject: [PATCH] Last changes on blackboard --- cmd/test/main.go | 25 +++------------- pkg/obiblackboard/blackboard.go | 52 +++++++++++++++++++++++---------- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/cmd/test/main.go b/cmd/test/main.go index 2f1bb1a..cb6bcc2 100644 --- a/cmd/test/main.go +++ b/cmd/test/main.go @@ -4,31 +4,14 @@ import ( "fmt" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiblackboard" - "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) -func MakeCounter(n int) func(*obiblackboard.Task) *obiblackboard.Task { - count := obiutils.AtomicCounter() - - r1 := func(task *obiblackboard.Task) *obiblackboard.Task { - val := count() - if val < n { - nt := task.GetNext() - nt.Body = val - return nt - } - return nil - } - - return r1 -} - -func r2(task *obiblackboard.Task) *obiblackboard.Task { +func r2(bb *obiblackboard.Blackboard, task *obiblackboard.Task) *obiblackboard.Task { fmt.Printf("value : %v\n", task.Body) return obiblackboard.NewInitialTask() } -func rmul(task *obiblackboard.Task) *obiblackboard.Task { +func rmul(bb *obiblackboard.Blackboard, task *obiblackboard.Task) *obiblackboard.Task { nt := task.GetNext() nt.Body = task.Body.(int) * 2 return nt @@ -38,9 +21,9 @@ func main() { black := obiblackboard.NewBlackBoard(20) - black.RegisterRunner("todisplay", "initial", r2) + black.RegisterRunner("todisplay", "final", r2) black.RegisterRunner("multiply", "todisplay", rmul) - black.RegisterRunner("initial", "multiply", MakeCounter(1000)) + black.RegisterRunner("initial", "multiply", obiblackboard.DoCount(1000).RepeatTask(4)) black.Run() } diff --git a/pkg/obiblackboard/blackboard.go b/pkg/obiblackboard/blackboard.go index b591ca7..a110d46 100644 --- a/pkg/obiblackboard/blackboard.go +++ b/pkg/obiblackboard/blackboard.go @@ -7,17 +7,28 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) +type DoTask func(*Blackboard, *Task) *Task + type Runner struct { - Run func(*Task) *Task + Run DoTask To string } type Blackboard struct { - Board map[int]Queue - BoardLock *sync.Mutex - Runners map[string]Runner - Running *obiutils.Counter - Size int + Board map[int]Queue + BoardLock *sync.Mutex + Runners map[string]Runner + Running *obiutils.Counter + TargetSize int + Size int +} + +func doFinal(bb *Blackboard, task *Task) *Task { + if bb.Len() > bb.TargetSize { + return nil + } + + return NewInitialTask() } func NewBlackBoard(size int) *Blackboard { @@ -29,24 +40,28 @@ func NewBlackBoard(size int) *Blackboard { } bb := &Blackboard{ - Board: board, - BoardLock: &sync.Mutex{}, - Runners: runners, - Running: obiutils.NewCounter(), - Size: size, + Board: board, + BoardLock: &sync.Mutex{}, + Runners: runners, + Running: obiutils.NewCounter(), + TargetSize: size, + Size: 0, } for i := 0; i < size; i++ { bb.PushTask(NewInitialTask()) } + bb.RegisterRunner("final", "initial", doFinal) + return bb } -func (bb *Blackboard) RegisterRunner(from, to string, runner func(*Task) *Task) { +func (bb *Blackboard) RegisterRunner(from, to string, runner DoTask) { bb.Runners[from] = Runner{ Run: runner, - To: to} + To: to, + } } func (bb *Blackboard) MaxQueue() Queue { @@ -73,6 +88,7 @@ func (bb *Blackboard) PopTask() *Task { if len(*q) == 0 { delete(bb.Board, next_task.Priority) } + bb.Size-- return next_task } @@ -94,6 +110,8 @@ func (bb *Blackboard) PushTask(task *Task) { *queue = slices.Grow(*queue, 1) *queue = append((*queue), task) + + bb.Size++ } } @@ -107,7 +125,7 @@ func (bb *Blackboard) Run() { runner, ok := bb.Runners[task.Role] if ok { - task = runner.Run(task) + task = runner.Run(bb, task) if task != nil { task.Role = runner.To } @@ -120,7 +138,7 @@ func (bb *Blackboard) Run() { lock.Done() } - parallel := bb.Size - 1 + parallel := bb.TargetSize - 1 lock.Add(parallel) for i := 0; i < parallel; i++ { @@ -149,3 +167,7 @@ func (bb *Blackboard) Run() { lock.Wait() } + +func (bb *Blackboard) Len() int { + return bb.Size +}