Skip to content

Commit

Permalink
[DDC | Storage Cluster | Delivery set 17] (Subscribe storage node to …
Browse files Browse the repository at this point in the history
…change topology blockchain event)

Read events from the blockchain seens last block in case websocket was disconnected
Add Cid field to Ack (#42)
  • Loading branch information
Max-Levitskiy committed Jul 3, 2023
1 parent 1327c08 commit 24f3bb7
Show file tree
Hide file tree
Showing 15 changed files with 735 additions and 61 deletions.
1 change: 1 addition & 0 deletions contract/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ require (
github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.8
github.com/decred/base58 v1.0.3
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7
github.com/golang/mock v1.3.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.8.1
Expand Down
1 change: 1 addition & 0 deletions contract/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7 h1:5ZkaAPbicIKTF
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
4 changes: 2 additions & 2 deletions contract/pkg/bucket/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package bucket

import (
"encoding/hex"
"github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg"
"github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/utils"
"strings"
"testing"

Expand All @@ -16,7 +16,7 @@ func TestBucketWriteAccess(t *testing.T) {
publicKey := "0xd049e851567f16d68523a645ee96465ceb678ad983fc08e6a38408ad10410c4d"
publicKeyB, _ := hex.DecodeString(strings.TrimPrefix(publicKey, "0x"))

accountID, _ := pkg.DecodeAccountIDFromSS58(ss58)
accountID, _ := utils.DecodeAccountIDFromSS58(ss58)
bucketStatus := &BucketStatus{WriterIds: []types.AccountID{accountID}}

//when
Expand Down
169 changes: 112 additions & 57 deletions contract/pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"context"
"github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/sdktypes"
"github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription"
"github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/utils"
"math"
"os/signal"
"reflect"
"sync"
Expand All @@ -23,13 +25,20 @@ const (
CERE = 10_000_000_000
)

var (
chainSubscriptionFactory = subscription.NewChainFactory()
watchdogFactory = subscription.NewWatchdogFactory()
watchdogTimeout = time.Minute
)

type (
blockchainClient struct {
*gsrpc.SubstrateAPI
eventContractAccount types.AccountID
eventDispatcher map[types.Hash]sdktypes.ContractEventDispatchEntry
eventContextCancel context.CancelFunc
eventContextCancel []context.CancelFunc
connectMutex sync.Mutex
eventDecoder subscription.EventDecoder
}
)

Expand All @@ -41,6 +50,7 @@ func CreateBlockchainClient(apiUrl string) sdktypes.BlockchainClient {

return &blockchainClient{
SubstrateAPI: substrateAPI,
eventDecoder: subscription.NewEventDecoder(),
}
}

Expand Down Expand Up @@ -69,33 +79,74 @@ func (b *blockchainClient) listenContractEvents() error {
return err
}

sub, err := b.RPC.State.SubscribeStorageRaw([]types.StorageKey{key})
s, err := b.RPC.State.SubscribeStorageRaw([]types.StorageKey{key})
if err != nil {
return err
}
b.processChainSubscription(chainSubscriptionFactory.NewChainSubscription(s), key, meta)
return nil
}

func (b *blockchainClient) processChainSubscription(sub subscription.ChainSubscription, key types.StorageKey, meta *types.Metadata) {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
b.eventContextCancel = cancel
watchdog := time.NewTicker(time.Minute)
b.eventContextCancel = append(b.eventContextCancel, cancel)
watchdog := watchdogFactory.NewWatchdog(watchdogTimeout)
eventArrived := true
var lastEventBlock types.BlockNumber
var lastEventBlockHash types.Hash
go func() {
defer sub.Unsubscribe()
for {
select {
case <-ctx.Done():
log.Info("Chain subscription context done")
return

case <-watchdog.C:
case <-watchdog.C():
if !eventArrived {
log.WithField("block", lastEventBlockHash.Hex()).Warn("Watchdog event timeout")

// read missed blocks
lastEventBlock, err := b.RPC.Chain.GetBlock(lastEventBlockHash)
if err != nil {
log.WithError(err).Warn("Error fetching block")
break
}
lastEventBlockNumber := lastEventBlock.Block.Header.Number
headerLatest, err := b.RPC.Chain.GetHeaderLatest()
if err != nil {
log.WithError(err).Warn("Error fetching latest header")
} else if headerLatest.Number > lastEventBlockNumber {
for i := lastEventBlockNumber + 1; i <= headerLatest.Number; i++ {
missedBlock, err := b.RPC.Chain.GetBlockHash(uint64(i))
if err != nil {
log.Println(err)
continue
}
storageData, err := b.RPC.State.GetStorageRaw(key, missedBlock)
if err != nil {
log.WithError(err).Error("Error fetching storage data")
continue
}
events, err := b.eventDecoder.DecodeEvents(*storageData, meta)
if err != nil {
log.WithError(err).Error("Error parsing events")
continue
}

b.processEvents(events, missedBlock)
lastEventBlockHash = missedBlock
}
}

// try to resubscribe
s, err := b.RPC.State.SubscribeStorageRaw([]types.StorageKey{key})
if err != nil {
log.WithError(err).Warn("Watchdog resubscribtion failed")
break
}
log.Info("Watchdog event resubscribed")
sub.Unsubscribe()
sub = s
sub = chainSubscriptionFactory.NewChainSubscription(s)
}
eventArrived = false

Expand All @@ -108,13 +159,7 @@ func (b *blockchainClient) listenContractEvents() error {
break
}
eventArrived = true
block, err := b.RPC.Chain.GetBlock(evt.Block)
if err != nil {
log.WithError(err).Warn("Error fetching block")
break
}
lastEventBlock = block.Block.Header.Number
print(lastEventBlock)
lastEventBlockHash = evt.Block

// parse all events for this block
for _, chng := range evt.Changes {
Expand All @@ -123,54 +168,58 @@ func (b *blockchainClient) listenContractEvents() error {
continue
}

events := types.EventRecords{}
err = types.EventRecordsRaw(chng.StorageData).DecodeEventRecords(meta, &events)
storageData := chng.StorageData
events, err := b.eventDecoder.DecodeEvents(storageData, meta)
if err != nil {
log.WithError(err).Warnf("Error parsing event %x", chng.StorageData[:])
log.WithError(err).Warnf("Error parsing event %x", storageData[:])
continue
}

for _, e := range events.Contracts_ContractEmitted {
if !b.eventContractAccount.Equal(&e.Contract) {
continue
}

// Identify the event by matching one of its topics against known signatures. The topics are sorted so
// the needed one may be in the arbitrary position.
var dispatchEntry sdktypes.ContractEventDispatchEntry
found := false
for _, topic := range e.Topics {
dispatchEntry, found = b.eventDispatcher[topic]
if found {
break
}
}
if !found {
log.WithField("block", evt.Block.Hex()).
Warnf("Unknown event emitted by our contract: %x", e.Data[:16])
continue
}

if dispatchEntry.Handler == nil {
log.WithField("block", evt.Block.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()).
Debug("Event unhandeled")
continue
}
args := reflect.New(dispatchEntry.ArgumentType).Interface()
if err := codec.Decode(e.Data[1:], args); err != nil {
log.WithError(err).WithField("block", evt.Block.Hex()).
WithField("event", dispatchEntry.ArgumentType.Name()).
Errorf("Cannot decode event data %x", e.Data)
}
log.WithField("block", evt.Block.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()).
Debugf("Event args: %x", e.Data)
dispatchEntry.Handler(args)
}
b.processEvents(events, evt.Block)
}
}
}
}()
return nil
}

func (b *blockchainClient) processEvents(events *types.EventRecords, blockHash types.Hash) {
for _, e := range events.Contracts_ContractEmitted {
if !b.eventContractAccount.Equal(&e.Contract) {
continue
}

// Identify the event by matching one of its topics against known signatures. The topics are sorted so
// the needed one may be in the arbitrary position.
var dispatchEntry sdktypes.ContractEventDispatchEntry
found := false
for _, topic := range e.Topics {
dispatchEntry, found = b.eventDispatcher[topic]
if found {
break
}
}
if !found {

log.WithField("block", blockHash.Hex()).
Warnf("Unknown event emitted by our contract: %x", e.Data[:uint32(math.Min(16, float64(len(e.Data))))])
continue
}

if dispatchEntry.Handler == nil {
log.WithField("block", blockHash.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()).
Debug("Event unhandeled")
continue
}
args := reflect.New(dispatchEntry.ArgumentType).Interface()
if err := codec.Decode(e.Data[1:], args); err != nil {
log.WithError(err).WithField("block", blockHash.Hex()).
WithField("event", dispatchEntry.ArgumentType.Name()).
Errorf("Cannot decode event data %x", e.Data)
}
log.WithField("block", blockHash.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()).
Debugf("Event args: %x", e.Data)
dispatchEntry.Handler(args)
}
}

func (b *blockchainClient) CallToReadEncoded(contractAddressSS58 string, fromAddress string, method []byte, args ...interface{}) (string, error) {
Expand Down Expand Up @@ -410,9 +459,8 @@ func (b *blockchainClient) reconnect() error {
return nil
}

if b.eventContextCancel != nil {
b.eventContextCancel()
}
b.unsubscribeAll()

substrateAPI, err := gsrpc.NewSubstrateAPI(b.Client.URL())
if err != nil {
log.WithError(err).Warningf("Blockchain client can't reconnect to %s", b.Client.URL())
Expand All @@ -428,3 +476,10 @@ func (b *blockchainClient) reconnect() error {

return nil
}

func (b *blockchainClient) unsubscribeAll() {
for _, c := range b.eventContextCancel {
c()
}
b.eventContextCancel = nil
}
Loading

0 comments on commit 24f3bb7

Please sign in to comment.