From 76fcadc999ec20893015d6bb0a2b1b6c4d99ac10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Lorenz?= Date: Wed, 15 May 2019 15:58:33 +1000 Subject: [PATCH] Do not use context cancel, remove select --- bus.go | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/bus.go b/bus.go index 51835ed..e3ce546 100644 --- a/bus.go +++ b/bus.go @@ -1,7 +1,6 @@ package messagebus import ( - "context" "fmt" "reflect" "sync" @@ -23,9 +22,7 @@ type MessageBus interface { type handlersMap map[string][]*handler type handler struct { - ctx context.Context callback reflect.Value - cancel context.CancelFunc queue chan []reflect.Value } @@ -53,28 +50,14 @@ func (b *messageBus) Subscribe(topic string, fn interface{}) error { return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn)) } - ctx, cancel := context.WithCancel(context.Background()) - h := &handler{ callback: reflect.ValueOf(fn), - ctx: ctx, - cancel: cancel, queue: make(chan []reflect.Value, b.handlerQueueSize), } go func() { - for { - select { - case args, ok := <-h.queue: - // check if channel has been closed (by Unsubscribe or Close) - if !ok { - return - } - - h.callback.Call(args) - case <-h.ctx.Done(): - return - } + for args := range h.queue { + h.callback.Call(args) } }() @@ -95,7 +78,6 @@ func (b *messageBus) Unsubscribe(topic string, fn interface{}) error { for i, h := range b.handlers[topic] { if h.callback == rv { - h.cancel() close(h.queue) b.handlers[topic] = append(b.handlers[topic][:i], b.handlers[topic][i+1:]...) @@ -114,7 +96,6 @@ func (b *messageBus) Close(topic string) { if _, ok := b.handlers[topic]; ok { for _, h := range b.handlers[topic] { - h.cancel() close(h.queue) }