Skip to content

Commit

Permalink
Merge pull request #135 from mschneider82/master
Browse files Browse the repository at this point in the history
close channel only once. fixes panic #134
  • Loading branch information
worg authored Jul 14, 2024
2 parents 130c3ad + 3acb09a commit f7566e1
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Subscription struct {
state int32
closeMutex *sync.Mutex
closeCond *sync.Cond
closeOnce sync.Once
unsubscribeReceiptTimeout time.Duration
}

Expand Down Expand Up @@ -146,12 +147,14 @@ func (s *Subscription) Read() (*Message, error) {
}

func (s *Subscription) closeChannel(msg *Message) {
if msg != nil {
s.C <- msg
}
atomic.StoreInt32(&s.state, subStateClosed)
close(s.C)
s.closeCond.Broadcast()
s.closeOnce.Do(func() {
if msg != nil {
s.C <- msg
}
atomic.StoreInt32(&s.state, subStateClosed)
close(s.C)
s.closeCond.Broadcast()
})
}

func (s *Subscription) subscriptionErrorMessage(message string) *Message {
Expand Down

0 comments on commit f7566e1

Please sign in to comment.