Skip to content

Commit

Permalink
Events listening callbacks error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
khssnv committed Jun 10, 2024
1 parent fee98b8 commit f4d9b65
Showing 1 changed file with 89 additions and 126 deletions.
215 changes: 89 additions & 126 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,28 @@ package blockchain

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff"
gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/exec"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/state"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"golang.org/x/sync/errgroup"

"github.com/cerebellum-network/cere-ddc-sdk-go/blockchain/pallets"
)

var errCancelled = errors.New("cancelled")

type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash)
type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash) error

type Client struct {
*gsrpc.SubstrateAPI

mu sync.Mutex
eventsListeners map[*EventsListener]struct{}

isListening uint32
cancelListening func()
errsListening chan error

DdcClusters pallets.DdcClustersApi
DdcCustomers pallets.DdcCustomersApi
DdcNodes pallets.DdcNodesApi
Expand All @@ -60,22 +50,20 @@ func NewClient(url string) (*Client, error) {
}, nil
}

// StartEventsListening subscribes for blockchain events and passes events starting from the
// 'begin' block to registered events listeners. Listeners registered after this call will only
// receive live events meaning all listeners which need historical events from 'begin' block
// should be registered at the moment of calling this function. The 'afterBlock' callback is
// invoked after all registered events listeners are already invoked.
func (c *Client) StartEventsListening(
// ListenEvents listens for blockchain events and sequentially calls registered events listeners to
// process incoming events. It starts from the block begin and calls callback after when all events
// listeners already called on a block events.
//
// ListenEvents always returns a non-nil error from a registered events listener or a callback
// after.
func (c *Client) ListenEvents(
ctx context.Context,
begin types.BlockNumber,
after func(blockNumber types.BlockNumber, blockHash types.Hash),
) (context.CancelFunc, <-chan error, error) {
if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) {
return c.cancelListening, c.errsListening, nil
}

after func(blockNumber types.BlockNumber, blockHash types.Hash) error,
) error {
sub, err := c.RPC.Chain.SubscribeNewHeads()
if err != nil {
return nil, nil, fmt.Errorf("subscribe new heads: %w", err)
return err
}

retriever, err := retriever.NewEventRetriever(
Expand All @@ -87,142 +75,134 @@ func (c *Client) StartEventsListening(
exec.NewRetryableExecutor[[]*parser.Event](exec.WithMaxRetryCount(0)),
)
if err != nil {
return nil, nil, fmt.Errorf("event retriever: %w", err)
return err
}

c.errsListening = make(chan error)
g, ctx := errgroup.WithContext(ctx)

liveHeadersC := sub.Chan()
histHeadersC := make(chan types.Header)
var wg sync.WaitGroup
go func() {
<-ctx.Done()
sub.Unsubscribe()
}()

// Query historical headers.
var cancelled atomic.Value
cancelled.Store(false)
wg.Add(1)
go func(beginBlock types.BlockNumber, live <-chan types.Header, hist chan types.Header) {
defer wg.Done()
defer close(hist)

firstLiveHeader := <-live // the first live header is the last historical header

for block := beginBlock; block < firstLiveHeader.Number; {
var header *types.Header
err := retryUntilCancelled(func() error {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block))
if err != nil {
c.errsListening <- fmt.Errorf("get historical block hash: %w", err)
return err
}
histHeadersC := make(chan types.Header)
g.Go(func() error {
defer close(histHeadersC)

header, err = c.RPC.Chain.GetHeader(blockHash)
if err != nil {
c.errsListening <- fmt.Errorf("get historical header: %w", err)
return err
}
firstLiveHeader, ok := <-liveHeadersC // first live header will be the last historical
if !ok {
return ctx.Err()
}

return nil
}, &cancelled)
for block := begin; block < firstLiveHeader.Number; block++ {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block))
if err != nil {
if err == errCancelled {
return
}
continue
return err
}

header, err := c.RPC.Chain.GetHeader(blockHash)
if err != nil {
return err
}

hist <- *header
select {
case <-ctx.Done():
return ctx.Err()
case histHeadersC <- *header:
}
}

block++
select {
case <-ctx.Done():
return ctx.Err()
case histHeadersC <- firstLiveHeader:
}

hist <- firstLiveHeader
}(begin, liveHeadersC, histHeadersC)
return nil
})

// Sequence historical and live headers.
headersC := make(chan types.Header)
wg.Add(1)
go func(hist, live <-chan types.Header, headersC chan types.Header) {
defer wg.Done()
g.Go(func() error {
defer close(headersC)

for header := range hist {
headersC <- header
for header := range histHeadersC {
select {
case <-ctx.Done():
return ctx.Err()
case headersC <- header:
}
}

for header := range live {
headersC <- header
for header := range liveHeadersC {
select {
case <-ctx.Done():
return ctx.Err()
case headersC <- header:
}
}
}(histHeadersC, liveHeadersC, headersC)

return nil
})

// Retrieve events skipping blocks before 'begin'.
eventsC := make(chan blockEvents)
wg.Add(1)
go func(headersC <-chan types.Header, eventsC chan blockEvents) {
defer wg.Done()
g.Go(func() error {
defer close(eventsC)

for header := range headersC {
if header.Number < begin {
continue
}

var hash types.Hash
var events []*parser.Event
err := retryUntilCancelled(func() error {
var err error
hash, err = c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
c.errsListening <- fmt.Errorf("get block hash: %w", err)
return err
}

events, err = retriever.GetEvents(hash)
if err != nil {
c.errsListening <- fmt.Errorf("events retriever: %w", err)
return err
}
hash, err := c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
return err
}

return nil
}, &cancelled)
events, err := retriever.GetEvents(hash)
if err != nil {
continue
return err
}

eventsC <- blockEvents{
select {
case <-ctx.Done():
return ctx.Err()
case eventsC <- blockEvents{
Events: events,
Hash: hash,
Number: header.Number,
}:
}
}
}(headersC, eventsC)

return nil
})

// Invoke listeners.
wg.Add(1)
go func(eventsC <-chan blockEvents) {
defer wg.Done()
g.Go(func() error {
for blockEvents := range eventsC {
for callback := range c.eventsListeners {
(*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash)
err := (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash)
if err != nil {
return err
}
}

if after != nil {
after(blockEvents.Number, blockEvents.Hash)
err := after(blockEvents.Number, blockEvents.Hash)
if err != nil {
return err
}
}
}
}(eventsC)

once := sync.Once{}
c.cancelListening = func() {
once.Do(func() {
sub.Unsubscribe()
cancelled.Store(true)
wg.Wait()
close(c.errsListening)
c.isListening = 0
})
}
return ctx.Err()
})

return c.cancelListening, c.errsListening, nil
return g.Wait()
}

// RegisterEventsListener subscribes given callback to blockchain events.
Expand All @@ -246,20 +226,3 @@ type blockEvents struct {
Hash types.Hash
Number types.BlockNumber
}

func retryUntilCancelled(f func() error, cancelled *atomic.Value) error {
expbackoff := backoff.NewExponentialBackOff()
expbackoff.MaxElapsedTime = 0 // never stop
expbackoff.InitialInterval = 10 * time.Second
expbackoff.Multiplier = 2
expbackoff.MaxInterval = 10 * time.Minute

ff := func() error {
if cancelled.Load().(bool) {
return backoff.Permanent(errCancelled)
}
return f()
}

return backoff.Retry(ff, expbackoff)
}

0 comments on commit f4d9b65

Please sign in to comment.