diff --git a/subscription.go b/subscription.go index 8e0fb63..d1083e5 100644 --- a/subscription.go +++ b/subscription.go @@ -30,6 +30,7 @@ type Subscription struct { state int32 closeMutex *sync.Mutex closeCond *sync.Cond + closeOnce sync.Once unsubscribeReceiptTimeout time.Duration } @@ -146,12 +147,14 @@ func (s *Subscription) Read() (*Message, error) { } func (s *Subscription) closeChannel(msg *Message) { - if msg != nil { - s.C <- msg - } - atomic.StoreInt32(&s.state, subStateClosed) - close(s.C) - s.closeCond.Broadcast() + s.closeOnce.Do(func() { + if msg != nil { + s.C <- msg + } + atomic.StoreInt32(&s.state, subStateClosed) + close(s.C) + s.closeCond.Broadcast() + }) } func (s *Subscription) subscriptionErrorMessage(message string) *Message {