Skip to content

Commit

Permalink
Ignore duplicate attempt to start event listening
Browse files Browse the repository at this point in the history
  • Loading branch information
khssnv committed Jan 5, 2024
1 parent 2966920 commit 19c6706
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"

gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
Expand All @@ -16,8 +17,12 @@ type EventsListener func(*pallets.Events)

type Client struct {
*gsrpc.SubstrateAPI

eventsListeners map[int]EventsListener
mu sync.Mutex
isListening uint32
stopListening func()
errsListening chan error

DdcClusters pallets.DdcClustersApi
DdcCustomers pallets.DdcCustomersApi
Expand Down Expand Up @@ -46,6 +51,10 @@ func NewClient(url string) (*Client, error) {
}

func (c *Client) StartEventsListening() (func(), <-chan error, error) {
if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) {
return c.stopListening, c.errsListening, nil
}

meta, err := c.RPC.State.GetMetadataLatest()
if err != nil {
return nil, nil, err
Expand All @@ -60,7 +69,7 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) {
}

done := make(chan struct{})
errCh := make(chan error)
c.errsListening = make(chan error)

go func() {
for {
Expand All @@ -76,7 +85,7 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) {
events := &pallets.Events{}
err = types.EventRecordsRaw(change.StorageData).DecodeEventRecords(meta, events)
if err != nil {
errCh <- fmt.Errorf("events decoder: %w", err)
c.errsListening <- fmt.Errorf("events decoder: %w", err)
}

for _, callback := range c.eventsListeners {
Expand All @@ -88,14 +97,15 @@ func (c *Client) StartEventsListening() (func(), <-chan error, error) {
}()

once := sync.Once{}
stop := func() {
c.stopListening = func() {
once.Do(func() {
done <- struct{}{}
sub.Unsubscribe()
c.isListening = 0
})
}

return stop, errCh, nil
return c.stopListening, c.errsListening, nil
}

func (c *Client) RegisterEventsListener(callback EventsListener) (func(), error) {
Expand Down

0 comments on commit 19c6706

Please sign in to comment.