Skip to content

Commit

Permalink
Do not use context cancel, remove select
Browse files Browse the repository at this point in the history
  • Loading branch information
vardius committed May 15, 2019
1 parent bd1fca0 commit 76fcadc
Showing 1 changed file with 2 additions and 21 deletions.
23 changes: 2 additions & 21 deletions bus.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package messagebus

import (
"context"
"fmt"
"reflect"
"sync"
Expand All @@ -23,9 +22,7 @@ type MessageBus interface {
type handlersMap map[string][]*handler

type handler struct {
ctx context.Context
callback reflect.Value
cancel context.CancelFunc
queue chan []reflect.Value
}

Expand Down Expand Up @@ -53,28 +50,14 @@ func (b *messageBus) Subscribe(topic string, fn interface{}) error {
return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
}

ctx, cancel := context.WithCancel(context.Background())

h := &handler{
callback: reflect.ValueOf(fn),
ctx: ctx,
cancel: cancel,
queue: make(chan []reflect.Value, b.handlerQueueSize),
}

go func() {
for {
select {
case args, ok := <-h.queue:
// check if channel has been closed (by Unsubscribe or Close)
if !ok {
return
}

h.callback.Call(args)
case <-h.ctx.Done():
return
}
for args := range h.queue {
h.callback.Call(args)
}
}()

Expand All @@ -95,7 +78,6 @@ func (b *messageBus) Unsubscribe(topic string, fn interface{}) error {

for i, h := range b.handlers[topic] {
if h.callback == rv {
h.cancel()
close(h.queue)

b.handlers[topic] = append(b.handlers[topic][:i], b.handlers[topic][i+1:]...)
Expand All @@ -114,7 +96,6 @@ func (b *messageBus) Close(topic string) {

if _, ok := b.handlers[topic]; ok {
for _, h := range b.handlers[topic] {
h.cancel()
close(h.queue)
}

Expand Down

0 comments on commit 76fcadc

Please sign in to comment.