diff --git a/blockchain/client.go b/blockchain/client.go index 0d57eae..e90b3e5 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -2,13 +2,8 @@ package blockchain import ( "context" - "errors" - "fmt" "sync" - "sync/atomic" - "time" - "github.com/cenkalti/backoff" gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4" "github.com/centrifuge/go-substrate-rpc-client/v4/registry" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/exec" @@ -16,13 +11,12 @@ import ( "github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/state" "github.com/centrifuge/go-substrate-rpc-client/v4/types" + "golang.org/x/sync/errgroup" "github.com/cerebellum-network/cere-ddc-sdk-go/blockchain/pallets" ) -var errCancelled = errors.New("cancelled") - -type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash) +type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash) error type Client struct { *gsrpc.SubstrateAPI @@ -30,10 +24,6 @@ type Client struct { mu sync.Mutex eventsListeners map[*EventsListener]struct{} - isListening uint32 - cancelListening func() - errsListening chan error - DdcClusters pallets.DdcClustersApi DdcCustomers pallets.DdcCustomersApi DdcNodes pallets.DdcNodesApi @@ -60,22 +50,20 @@ func NewClient(url string) (*Client, error) { }, nil } -// StartEventsListening subscribes for blockchain events and passes events starting from the -// 'begin' block to registered events listeners. Listeners registered after this call will only -// receive live events meaning all listeners which need historical events from 'begin' block -// should be registered at the moment of calling this function. The 'afterBlock' callback is -// invoked after all registered events listeners are already invoked. -func (c *Client) StartEventsListening( +// ListenEvents listens for blockchain events and sequentially calls registered events listeners to +// process incoming events. It starts from the block begin and calls callback after when all events +// listeners already called on a block events. +// +// ListenEvents always returns a non-nil error from a registered events listener or a callback +// after. +func (c *Client) ListenEvents( + ctx context.Context, begin types.BlockNumber, - after func(blockNumber types.BlockNumber, blockHash types.Hash), -) (context.CancelFunc, <-chan error, error) { - if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) { - return c.cancelListening, c.errsListening, nil - } - + after func(blockNumber types.BlockNumber, blockHash types.Hash) error, +) error { sub, err := c.RPC.Chain.SubscribeNewHeads() if err != nil { - return nil, nil, fmt.Errorf("subscribe new heads: %w", err) + return err } retriever, err := retriever.NewEventRetriever( @@ -87,78 +75,81 @@ func (c *Client) StartEventsListening( exec.NewRetryableExecutor[[]*parser.Event](exec.WithMaxRetryCount(0)), ) if err != nil { - return nil, nil, fmt.Errorf("event retriever: %w", err) + return err } - c.errsListening = make(chan error) + g, ctx := errgroup.WithContext(ctx) liveHeadersC := sub.Chan() - histHeadersC := make(chan types.Header) - var wg sync.WaitGroup + go func() { + <-ctx.Done() + sub.Unsubscribe() + }() // Query historical headers. - var cancelled atomic.Value - cancelled.Store(false) - wg.Add(1) - go func(beginBlock types.BlockNumber, live <-chan types.Header, hist chan types.Header) { - defer wg.Done() - defer close(hist) - - firstLiveHeader := <-live // the first live header is the last historical header - - for block := beginBlock; block < firstLiveHeader.Number; { - var header *types.Header - err := retryUntilCancelled(func() error { - blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block)) - if err != nil { - c.errsListening <- fmt.Errorf("get historical block hash: %w", err) - return err - } + histHeadersC := make(chan types.Header) + g.Go(func() error { + defer close(histHeadersC) - header, err = c.RPC.Chain.GetHeader(blockHash) - if err != nil { - c.errsListening <- fmt.Errorf("get historical header: %w", err) - return err - } + firstLiveHeader, ok := <-liveHeadersC // first live header will be the last historical + if !ok { + return ctx.Err() + } - return nil - }, &cancelled) + for block := begin; block < firstLiveHeader.Number; block++ { + blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block)) if err != nil { - if err == errCancelled { - return - } - continue + return err + } + + header, err := c.RPC.Chain.GetHeader(blockHash) + if err != nil { + return err } - hist <- *header + select { + case <-ctx.Done(): + return ctx.Err() + case histHeadersC <- *header: + } + } - block++ + select { + case <-ctx.Done(): + return ctx.Err() + case histHeadersC <- firstLiveHeader: } - hist <- firstLiveHeader - }(begin, liveHeadersC, histHeadersC) + return nil + }) // Sequence historical and live headers. headersC := make(chan types.Header) - wg.Add(1) - go func(hist, live <-chan types.Header, headersC chan types.Header) { - defer wg.Done() + g.Go(func() error { defer close(headersC) - for header := range hist { - headersC <- header + for header := range histHeadersC { + select { + case <-ctx.Done(): + return ctx.Err() + case headersC <- header: + } } - for header := range live { - headersC <- header + for header := range liveHeadersC { + select { + case <-ctx.Done(): + return ctx.Err() + case headersC <- header: + } } - }(histHeadersC, liveHeadersC, headersC) + + return nil + }) // Retrieve events skipping blocks before 'begin'. eventsC := make(chan blockEvents) - wg.Add(1) - go func(headersC <-chan types.Header, eventsC chan blockEvents) { - defer wg.Done() + g.Go(func() error { defer close(eventsC) for header := range headersC { @@ -166,63 +157,52 @@ func (c *Client) StartEventsListening( continue } - var hash types.Hash - var events []*parser.Event - err := retryUntilCancelled(func() error { - var err error - hash, err = c.RPC.Chain.GetBlockHash(uint64(header.Number)) - if err != nil { - c.errsListening <- fmt.Errorf("get block hash: %w", err) - return err - } - - events, err = retriever.GetEvents(hash) - if err != nil { - c.errsListening <- fmt.Errorf("events retriever: %w", err) - return err - } + hash, err := c.RPC.Chain.GetBlockHash(uint64(header.Number)) + if err != nil { + return err + } - return nil - }, &cancelled) + events, err := retriever.GetEvents(hash) if err != nil { - continue + return err } - eventsC <- blockEvents{ + select { + case <-ctx.Done(): + return ctx.Err() + case eventsC <- blockEvents{ Events: events, Hash: hash, Number: header.Number, + }: } } - }(headersC, eventsC) + + return nil + }) // Invoke listeners. - wg.Add(1) - go func(eventsC <-chan blockEvents) { - defer wg.Done() + g.Go(func() error { for blockEvents := range eventsC { for callback := range c.eventsListeners { - (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash) + err := (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash) + if err != nil { + return err + } } if after != nil { - after(blockEvents.Number, blockEvents.Hash) + err := after(blockEvents.Number, blockEvents.Hash) + if err != nil { + return err + } } } - }(eventsC) - once := sync.Once{} - c.cancelListening = func() { - once.Do(func() { - sub.Unsubscribe() - cancelled.Store(true) - wg.Wait() - close(c.errsListening) - c.isListening = 0 - }) - } + return ctx.Err() + }) - return c.cancelListening, c.errsListening, nil + return g.Wait() } // RegisterEventsListener subscribes given callback to blockchain events. @@ -246,20 +226,3 @@ type blockEvents struct { Hash types.Hash Number types.BlockNumber } - -func retryUntilCancelled(f func() error, cancelled *atomic.Value) error { - expbackoff := backoff.NewExponentialBackOff() - expbackoff.MaxElapsedTime = 0 // never stop - expbackoff.InitialInterval = 10 * time.Second - expbackoff.Multiplier = 2 - expbackoff.MaxInterval = 10 * time.Minute - - ff := func() error { - if cancelled.Load().(bool) { - return backoff.Permanent(errCancelled) - } - return f() - } - - return backoff.Retry(ff, expbackoff) -}