From 24f3bb7ca621d151793f09eec26f269f4248141c Mon Sep 17 00:00:00 2001 From: Max Levitskiy Date: Mon, 3 Jul 2023 14:48:45 +0200 Subject: [PATCH] [DDC | Storage Cluster | Delivery set 17] (Subscribe storage node to change topology blockchain event) Read events from the blockchain seens last block in case websocket was disconnected Add Cid field to Ack (#42) --- contract/go.mod | 1 + contract/go.sum | 1 + contract/pkg/bucket/messages_test.go | 4 +- contract/pkg/client.go | 169 ++++++++----- contract/pkg/client_test.go | 228 ++++++++++++++++++ .../mock/subscription_ChainSubscription.go | 75 ++++++ .../subscription_ChainSubscriptionFactory.go | 50 ++++ .../pkg/mock/subscription_EventDecoder.go | 50 ++++ contract/pkg/mock/subscription_Watchdog.go | 49 ++++ .../pkg/mock/subscription_WatchdogFactory.go | 50 ++++ contract/pkg/subscription/event_decorer.go | 27 +++ contract/pkg/subscription/subscription.go | 49 ++++ contract/pkg/subscription/watchdog.go | 37 +++ ddc-schemas | 2 +- go.work.sum | 4 +- 15 files changed, 735 insertions(+), 61 deletions(-) create mode 100644 contract/pkg/client_test.go create mode 100644 contract/pkg/mock/subscription_ChainSubscription.go create mode 100644 contract/pkg/mock/subscription_ChainSubscriptionFactory.go create mode 100644 contract/pkg/mock/subscription_EventDecoder.go create mode 100644 contract/pkg/mock/subscription_Watchdog.go create mode 100644 contract/pkg/mock/subscription_WatchdogFactory.go create mode 100644 contract/pkg/subscription/event_decorer.go create mode 100644 contract/pkg/subscription/subscription.go create mode 100644 contract/pkg/subscription/watchdog.go diff --git a/contract/go.mod b/contract/go.mod index 4be2a4b..e438000 100644 --- a/contract/go.mod +++ b/contract/go.mod @@ -4,6 +4,7 @@ require ( github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.8 github.com/decred/base58 v1.0.3 github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7 + github.com/golang/mock v1.3.1 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.8.1 diff --git a/contract/go.sum b/contract/go.sum index 017d5a0..41f34a1 100644 --- a/contract/go.sum +++ b/contract/go.sum @@ -153,6 +153,7 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7 h1:5ZkaAPbicIKTF github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/contract/pkg/bucket/messages_test.go b/contract/pkg/bucket/messages_test.go index df9dcc1..2553236 100644 --- a/contract/pkg/bucket/messages_test.go +++ b/contract/pkg/bucket/messages_test.go @@ -2,7 +2,7 @@ package bucket import ( "encoding/hex" - "github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg" + "github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/utils" "strings" "testing" @@ -16,7 +16,7 @@ func TestBucketWriteAccess(t *testing.T) { publicKey := "0xd049e851567f16d68523a645ee96465ceb678ad983fc08e6a38408ad10410c4d" publicKeyB, _ := hex.DecodeString(strings.TrimPrefix(publicKey, "0x")) - accountID, _ := pkg.DecodeAccountIDFromSS58(ss58) + accountID, _ := utils.DecodeAccountIDFromSS58(ss58) bucketStatus := &BucketStatus{WriterIds: []types.AccountID{accountID}} //when diff --git a/contract/pkg/client.go b/contract/pkg/client.go index 45aaa82..184d98f 100644 --- a/contract/pkg/client.go +++ b/contract/pkg/client.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/sdktypes" + "github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription" "github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/utils" + "math" "os/signal" "reflect" "sync" @@ -23,13 +25,20 @@ const ( CERE = 10_000_000_000 ) +var ( + chainSubscriptionFactory = subscription.NewChainFactory() + watchdogFactory = subscription.NewWatchdogFactory() + watchdogTimeout = time.Minute +) + type ( blockchainClient struct { *gsrpc.SubstrateAPI eventContractAccount types.AccountID eventDispatcher map[types.Hash]sdktypes.ContractEventDispatchEntry - eventContextCancel context.CancelFunc + eventContextCancel []context.CancelFunc connectMutex sync.Mutex + eventDecoder subscription.EventDecoder } ) @@ -41,6 +50,7 @@ func CreateBlockchainClient(apiUrl string) sdktypes.BlockchainClient { return &blockchainClient{ SubstrateAPI: substrateAPI, + eventDecoder: subscription.NewEventDecoder(), } } @@ -69,25 +79,66 @@ func (b *blockchainClient) listenContractEvents() error { return err } - sub, err := b.RPC.State.SubscribeStorageRaw([]types.StorageKey{key}) + s, err := b.RPC.State.SubscribeStorageRaw([]types.StorageKey{key}) if err != nil { return err } + b.processChainSubscription(chainSubscriptionFactory.NewChainSubscription(s), key, meta) + return nil +} +func (b *blockchainClient) processChainSubscription(sub subscription.ChainSubscription, key types.StorageKey, meta *types.Metadata) { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - b.eventContextCancel = cancel - watchdog := time.NewTicker(time.Minute) + b.eventContextCancel = append(b.eventContextCancel, cancel) + watchdog := watchdogFactory.NewWatchdog(watchdogTimeout) eventArrived := true - var lastEventBlock types.BlockNumber + var lastEventBlockHash types.Hash go func() { defer sub.Unsubscribe() for { select { case <-ctx.Done(): + log.Info("Chain subscription context done") return - case <-watchdog.C: + case <-watchdog.C(): if !eventArrived { + log.WithField("block", lastEventBlockHash.Hex()).Warn("Watchdog event timeout") + + // read missed blocks + lastEventBlock, err := b.RPC.Chain.GetBlock(lastEventBlockHash) + if err != nil { + log.WithError(err).Warn("Error fetching block") + break + } + lastEventBlockNumber := lastEventBlock.Block.Header.Number + headerLatest, err := b.RPC.Chain.GetHeaderLatest() + if err != nil { + log.WithError(err).Warn("Error fetching latest header") + } else if headerLatest.Number > lastEventBlockNumber { + for i := lastEventBlockNumber + 1; i <= headerLatest.Number; i++ { + missedBlock, err := b.RPC.Chain.GetBlockHash(uint64(i)) + if err != nil { + log.Println(err) + continue + } + storageData, err := b.RPC.State.GetStorageRaw(key, missedBlock) + if err != nil { + log.WithError(err).Error("Error fetching storage data") + continue + } + events, err := b.eventDecoder.DecodeEvents(*storageData, meta) + if err != nil { + log.WithError(err).Error("Error parsing events") + continue + } + + b.processEvents(events, missedBlock) + lastEventBlockHash = missedBlock + } + } + + // try to resubscribe s, err := b.RPC.State.SubscribeStorageRaw([]types.StorageKey{key}) if err != nil { log.WithError(err).Warn("Watchdog resubscribtion failed") @@ -95,7 +146,7 @@ func (b *blockchainClient) listenContractEvents() error { } log.Info("Watchdog event resubscribed") sub.Unsubscribe() - sub = s + sub = chainSubscriptionFactory.NewChainSubscription(s) } eventArrived = false @@ -108,13 +159,7 @@ func (b *blockchainClient) listenContractEvents() error { break } eventArrived = true - block, err := b.RPC.Chain.GetBlock(evt.Block) - if err != nil { - log.WithError(err).Warn("Error fetching block") - break - } - lastEventBlock = block.Block.Header.Number - print(lastEventBlock) + lastEventBlockHash = evt.Block // parse all events for this block for _, chng := range evt.Changes { @@ -123,54 +168,58 @@ func (b *blockchainClient) listenContractEvents() error { continue } - events := types.EventRecords{} - err = types.EventRecordsRaw(chng.StorageData).DecodeEventRecords(meta, &events) + storageData := chng.StorageData + events, err := b.eventDecoder.DecodeEvents(storageData, meta) if err != nil { - log.WithError(err).Warnf("Error parsing event %x", chng.StorageData[:]) + log.WithError(err).Warnf("Error parsing event %x", storageData[:]) continue } - for _, e := range events.Contracts_ContractEmitted { - if !b.eventContractAccount.Equal(&e.Contract) { - continue - } - - // Identify the event by matching one of its topics against known signatures. The topics are sorted so - // the needed one may be in the arbitrary position. - var dispatchEntry sdktypes.ContractEventDispatchEntry - found := false - for _, topic := range e.Topics { - dispatchEntry, found = b.eventDispatcher[topic] - if found { - break - } - } - if !found { - log.WithField("block", evt.Block.Hex()). - Warnf("Unknown event emitted by our contract: %x", e.Data[:16]) - continue - } - - if dispatchEntry.Handler == nil { - log.WithField("block", evt.Block.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()). - Debug("Event unhandeled") - continue - } - args := reflect.New(dispatchEntry.ArgumentType).Interface() - if err := codec.Decode(e.Data[1:], args); err != nil { - log.WithError(err).WithField("block", evt.Block.Hex()). - WithField("event", dispatchEntry.ArgumentType.Name()). - Errorf("Cannot decode event data %x", e.Data) - } - log.WithField("block", evt.Block.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()). - Debugf("Event args: %x", e.Data) - dispatchEntry.Handler(args) - } + b.processEvents(events, evt.Block) } } } }() - return nil +} + +func (b *blockchainClient) processEvents(events *types.EventRecords, blockHash types.Hash) { + for _, e := range events.Contracts_ContractEmitted { + if !b.eventContractAccount.Equal(&e.Contract) { + continue + } + + // Identify the event by matching one of its topics against known signatures. The topics are sorted so + // the needed one may be in the arbitrary position. + var dispatchEntry sdktypes.ContractEventDispatchEntry + found := false + for _, topic := range e.Topics { + dispatchEntry, found = b.eventDispatcher[topic] + if found { + break + } + } + if !found { + + log.WithField("block", blockHash.Hex()). + Warnf("Unknown event emitted by our contract: %x", e.Data[:uint32(math.Min(16, float64(len(e.Data))))]) + continue + } + + if dispatchEntry.Handler == nil { + log.WithField("block", blockHash.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()). + Debug("Event unhandeled") + continue + } + args := reflect.New(dispatchEntry.ArgumentType).Interface() + if err := codec.Decode(e.Data[1:], args); err != nil { + log.WithError(err).WithField("block", blockHash.Hex()). + WithField("event", dispatchEntry.ArgumentType.Name()). + Errorf("Cannot decode event data %x", e.Data) + } + log.WithField("block", blockHash.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()). + Debugf("Event args: %x", e.Data) + dispatchEntry.Handler(args) + } } func (b *blockchainClient) CallToReadEncoded(contractAddressSS58 string, fromAddress string, method []byte, args ...interface{}) (string, error) { @@ -410,9 +459,8 @@ func (b *blockchainClient) reconnect() error { return nil } - if b.eventContextCancel != nil { - b.eventContextCancel() - } + b.unsubscribeAll() + substrateAPI, err := gsrpc.NewSubstrateAPI(b.Client.URL()) if err != nil { log.WithError(err).Warningf("Blockchain client can't reconnect to %s", b.Client.URL()) @@ -428,3 +476,10 @@ func (b *blockchainClient) reconnect() error { return nil } + +func (b *blockchainClient) unsubscribeAll() { + for _, c := range b.eventContextCancel { + c() + } + b.eventContextCancel = nil +} diff --git a/contract/pkg/client_test.go b/contract/pkg/client_test.go new file mode 100644 index 0000000..3c5b51e --- /dev/null +++ b/contract/pkg/client_test.go @@ -0,0 +1,228 @@ +package pkg + +import ( + gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4" + "github.com/centrifuge/go-substrate-rpc-client/v4/rpc" + "github.com/centrifuge/go-substrate-rpc-client/v4/types" + "github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/mock" + "github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/sdktypes" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "reflect" + "testing" + "time" +) + +func Test_blockchainClient_processChainSubscription(t *testing.T) { + // given + mockController := gomock.NewController(t) + chainSubscriptionMock := mock.NewMockChainSubscription(mockController) + chainSubscriptionMock.EXPECT().Err().Return(make(chan error)).AnyTimes() + storageChangeSetsChan := make(chan types.StorageChangeSet) + chainSubscriptionMock.EXPECT().Chan().Return(storageChangeSetsChan).AnyTimes() + chainSubscriptionMock.EXPECT().Unsubscribe().AnyTimes() + + chainSubscriptionFactoryMock := mock.NewMockChainSubscriptionFactory(mockController) + chainSubscriptionFactoryMock.EXPECT().NewChainSubscription(gomock.Any()).Return(chainSubscriptionMock).AnyTimes() + chainSubscriptionFactory = chainSubscriptionFactoryMock + + stateMock := mock.NewMockState(mockController) + stateMock.EXPECT().SubscribeStorageRaw(gomock.Any()).Return(nil, nil).AnyTimes() + chainMock := mock.NewMockChain(mockController) + //chainMock.EXPECT().GetBlock(gomock.Any()).Return(nil, nil).AnyTimes() + + watchdogMock := mock.NewMockWatchdog(mockController) + watchdogChan := make(chan time.Time) + watchdogMock.EXPECT().C().Return(watchdogChan).AnyTimes() + watchdogFactoryMock := mock.NewMockWatchdogFactory(mockController) + watchdogFactoryMock.EXPECT().NewWatchdog(gomock.Any()).Return(watchdogMock).AnyTimes() + watchdogFactory = watchdogFactoryMock + + eventDecoderMock := mock.NewMockEventDecoder(mockController) + + substrateAPI := &gsrpc.SubstrateAPI{ + RPC: &rpc.RPC{ + State: stateMock, + Chain: chainMock, + }, + } + + c := blockchainClient{ + SubstrateAPI: substrateAPI, + eventDecoder: eventDecoderMock, + } + c.eventDispatcher = make(map[types.Hash]sdktypes.ContractEventDispatchEntry) + + t.Run("should resubscribe when timeout", func(t *testing.T) { + h := types.NewHash([]byte{1}) + handlerChan := make(chan bool) + ticker := time.NewTicker(1 * time.Second) + go func() { + <-ticker.C + handlerChan <- false + }() + c.eventDispatcher[h] = sdktypes.ContractEventDispatchEntry{ + Handler: func(_ interface{}) { + handlerChan <- true + }, + ArgumentType: reflect.TypeOf(interface{}("")), + } + storageKey := h[:] + events := &types.EventRecords{ + Contracts_ContractEmitted: []types.EventContractsContractEmitted{ + { + Data: []byte("test"), + Topics: []types.Hash{ + h, + }, + }, + }, + } + eventDecoderMock.EXPECT().DecodeEvents(gomock.Any(), gomock.Any()).Return(events, nil).Times(1) + + c.processChainSubscription(chainSubscriptionMock, storageKey, createMetadata()) + watchdogChan <- time.Now() + storageChangeSetsChan <- types.StorageChangeSet{ + Changes: []types.KeyValueOption{ + { + HasStorageData: true, + StorageKey: storageKey, + }, + }, + } + assert.True(t, <-handlerChan) + c.unsubscribeAll() + }) + t.Run("should read missed events from the blockchain before resubscribing", func(t *testing.T) { + + h := types.NewHash([]byte{2}) + storageKey := h[:] + + metadata := createMetadata() + c.processChainSubscription(chainSubscriptionMock, storageKey, metadata) + + // trigger to save last block + events := &types.EventRecords{ + Contracts_ContractEmitted: []types.EventContractsContractEmitted{ + { + Data: []byte("test"), + Topics: []types.Hash{ + h, + }, + }, + }, + } + eventDecoderMock.EXPECT().DecodeEvents(gomock.Any(), gomock.Any()).Return(events, nil).Times(1) + lastBlockHash := types.NewHash([]byte{123}) + storageChangeSetsChan <- types.StorageChangeSet{ + Block: lastBlockHash, + Changes: []types.KeyValueOption{ + { + HasStorageData: true, + StorageKey: storageKey, + }, + }, + } + + // nothing after first watchdog tick + watchdogChan <- time.Now() + + // Set up mocks + // getting block number by hash + lastSignedBlock := &types.SignedBlock{ + Block: types.Block{ + Header: types.Header{ + Number: 1, + }, + }, + } + chainMock.EXPECT().GetBlock(lastBlockHash).Return(lastSignedBlock, nil).Times(1) + // getting current block number + currentSignedBlock := &types.Header{ + Number: 3, + } + chainMock.EXPECT().GetHeaderLatest().Return(currentSignedBlock, nil).Times(1) + + // return block hash by number + missedBlockHash2 := types.NewHash([]byte{22}) + missedBlockHash3 := types.NewHash([]byte{33}) + chainMock.EXPECT().GetBlockHash(uint64(2)).Return(missedBlockHash2, nil).Times(1) + chainMock.EXPECT().GetBlockHash(uint64(3)).Return(missedBlockHash3, nil).Times(1) + + // return storage data for missed blocks + storageDataRaw1 := types.StorageDataRaw{1} + storageDataRaw2 := types.StorageDataRaw{2} + stateMock.EXPECT().GetStorageRaw(storageKey, missedBlockHash2).Return(&storageDataRaw1, nil).Times(1) + stateMock.EXPECT().GetStorageRaw(storageKey, missedBlockHash3).Return(&storageDataRaw2, nil).Times(1) + + // return events for missed blocks + events2 := &types.EventRecords{ + Contracts_ContractEmitted: []types.EventContractsContractEmitted{ + { + Data: []byte("test2"), + Topics: []types.Hash{ + h, + }, + }, + }, + } + events3 := &types.EventRecords{ + Contracts_ContractEmitted: []types.EventContractsContractEmitted{ + { + Data: []byte("test3"), + Topics: []types.Hash{ + h, + }, + }, + }, + } + eventDecoderMock.EXPECT().DecodeEvents([]byte{1}, metadata).Return(events2, nil).Times(1) + eventDecoderMock.EXPECT().DecodeEvents([]byte{2}, metadata).Return(events3, nil).Times(1) + + // add subscriber to event + handlerChan := make(chan bool) + ticker := time.NewTicker(1 * time.Second) + go func() { + <-ticker.C + handlerChan <- false + }() + c.eventDispatcher[h] = sdktypes.ContractEventDispatchEntry{ + Handler: func(_ interface{}) { + handlerChan <- true + }, + ArgumentType: reflect.TypeOf(interface{}("")), + } + + // trigger second time, because we resubscribe if there is no event between two watchdog ticks + watchdogChan <- time.Now() + assert.True(t, <-handlerChan) + + c.unsubscribeAll() + }) + +} + +func createMetadata() *types.Metadata { + metadata := &types.Metadata{ + Version: 14, + AsMetadataV14: types.MetadataV14{ + Pallets: []types.PalletMetadataV14{ + { + HasStorage: true, + Storage: types.StorageMetadataV14{ + Prefix: "System", + Items: []types.StorageEntryMetadataV14{ + { + Name: "Events", + Type: types.StorageEntryTypeV14{ + IsPlainType: true, + }, + }, + }, + }, + }, + }, + }, + } + return metadata +} diff --git a/contract/pkg/mock/subscription_ChainSubscription.go b/contract/pkg/mock/subscription_ChainSubscription.go new file mode 100644 index 0000000..0605955 --- /dev/null +++ b/contract/pkg/mock/subscription_ChainSubscription.go @@ -0,0 +1,75 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription (interfaces: ChainSubscription) + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + types "github.com/centrifuge/go-substrate-rpc-client/v4/types" + gomock "github.com/golang/mock/gomock" +) + +// MockChainSubscription is a mock of ChainSubscription interface. +type MockChainSubscription struct { + ctrl *gomock.Controller + recorder *MockChainSubscriptionMockRecorder +} + +// MockChainSubscriptionMockRecorder is the mock recorder for MockChainSubscription. +type MockChainSubscriptionMockRecorder struct { + mock *MockChainSubscription +} + +// NewMockChainSubscription creates a new mock instance. +func NewMockChainSubscription(ctrl *gomock.Controller) *MockChainSubscription { + mock := &MockChainSubscription{ctrl: ctrl} + mock.recorder = &MockChainSubscriptionMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChainSubscription) EXPECT() *MockChainSubscriptionMockRecorder { + return m.recorder +} + +// Chan mocks base method. +func (m *MockChainSubscription) Chan() <-chan types.StorageChangeSet { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Chan") + ret0, _ := ret[0].(<-chan types.StorageChangeSet) + return ret0 +} + +// Chan indicates an expected call of Chan. +func (mr *MockChainSubscriptionMockRecorder) Chan() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Chan", reflect.TypeOf((*MockChainSubscription)(nil).Chan)) +} + +// Err mocks base method. +func (m *MockChainSubscription) Err() <-chan error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") + ret0, _ := ret[0].(<-chan error) + return ret0 +} + +// Err indicates an expected call of Err. +func (mr *MockChainSubscriptionMockRecorder) Err() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockChainSubscription)(nil).Err)) +} + +// Unsubscribe mocks base method. +func (m *MockChainSubscription) Unsubscribe() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Unsubscribe") +} + +// Unsubscribe indicates an expected call of Unsubscribe. +func (mr *MockChainSubscriptionMockRecorder) Unsubscribe() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unsubscribe", reflect.TypeOf((*MockChainSubscription)(nil).Unsubscribe)) +} diff --git a/contract/pkg/mock/subscription_ChainSubscriptionFactory.go b/contract/pkg/mock/subscription_ChainSubscriptionFactory.go new file mode 100644 index 0000000..79a3eae --- /dev/null +++ b/contract/pkg/mock/subscription_ChainSubscriptionFactory.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription (interfaces: ChainSubscriptionFactory) + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + state "github.com/centrifuge/go-substrate-rpc-client/v4/rpc/state" + subscription "github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription" + gomock "github.com/golang/mock/gomock" +) + +// MockChainSubscriptionFactory is a mock of ChainSubscriptionFactory interface. +type MockChainSubscriptionFactory struct { + ctrl *gomock.Controller + recorder *MockChainSubscriptionFactoryMockRecorder +} + +// MockChainSubscriptionFactoryMockRecorder is the mock recorder for MockChainSubscriptionFactory. +type MockChainSubscriptionFactoryMockRecorder struct { + mock *MockChainSubscriptionFactory +} + +// NewMockChainSubscriptionFactory creates a new mock instance. +func NewMockChainSubscriptionFactory(ctrl *gomock.Controller) *MockChainSubscriptionFactory { + mock := &MockChainSubscriptionFactory{ctrl: ctrl} + mock.recorder = &MockChainSubscriptionFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChainSubscriptionFactory) EXPECT() *MockChainSubscriptionFactoryMockRecorder { + return m.recorder +} + +// NewChainSubscription mocks base method. +func (m *MockChainSubscriptionFactory) NewChainSubscription(arg0 *state.StorageSubscription) subscription.ChainSubscription { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewChainSubscription", arg0) + ret0, _ := ret[0].(subscription.ChainSubscription) + return ret0 +} + +// NewChainSubscription indicates an expected call of NewChainSubscription. +func (mr *MockChainSubscriptionFactoryMockRecorder) NewChainSubscription(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewChainSubscription", reflect.TypeOf((*MockChainSubscriptionFactory)(nil).NewChainSubscription), arg0) +} diff --git a/contract/pkg/mock/subscription_EventDecoder.go b/contract/pkg/mock/subscription_EventDecoder.go new file mode 100644 index 0000000..902622f --- /dev/null +++ b/contract/pkg/mock/subscription_EventDecoder.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription (interfaces: EventDecoder) + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + types "github.com/centrifuge/go-substrate-rpc-client/v4/types" + gomock "github.com/golang/mock/gomock" +) + +// MockEventDecoder is a mock of EventDecoder interface. +type MockEventDecoder struct { + ctrl *gomock.Controller + recorder *MockEventDecoderMockRecorder +} + +// MockEventDecoderMockRecorder is the mock recorder for MockEventDecoder. +type MockEventDecoderMockRecorder struct { + mock *MockEventDecoder +} + +// NewMockEventDecoder creates a new mock instance. +func NewMockEventDecoder(ctrl *gomock.Controller) *MockEventDecoder { + mock := &MockEventDecoder{ctrl: ctrl} + mock.recorder = &MockEventDecoderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventDecoder) EXPECT() *MockEventDecoderMockRecorder { + return m.recorder +} + +// DecodeEvents mocks base method. +func (m *MockEventDecoder) DecodeEvents(arg0 types.StorageDataRaw, arg1 *types.Metadata) (*types.EventRecords, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DecodeEvents", arg0, arg1) + ret0, _ := ret[0].(*types.EventRecords) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DecodeEvents indicates an expected call of DecodeEvents. +func (mr *MockEventDecoderMockRecorder) DecodeEvents(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DecodeEvents", reflect.TypeOf((*MockEventDecoder)(nil).DecodeEvents), arg0, arg1) +} diff --git a/contract/pkg/mock/subscription_Watchdog.go b/contract/pkg/mock/subscription_Watchdog.go new file mode 100644 index 0000000..725dee5 --- /dev/null +++ b/contract/pkg/mock/subscription_Watchdog.go @@ -0,0 +1,49 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription (interfaces: Watchdog) + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" +) + +// MockWatchdog is a mock of Watchdog interface. +type MockWatchdog struct { + ctrl *gomock.Controller + recorder *MockWatchdogMockRecorder +} + +// MockWatchdogMockRecorder is the mock recorder for MockWatchdog. +type MockWatchdogMockRecorder struct { + mock *MockWatchdog +} + +// NewMockWatchdog creates a new mock instance. +func NewMockWatchdog(ctrl *gomock.Controller) *MockWatchdog { + mock := &MockWatchdog{ctrl: ctrl} + mock.recorder = &MockWatchdogMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockWatchdog) EXPECT() *MockWatchdogMockRecorder { + return m.recorder +} + +// C mocks base method. +func (m *MockWatchdog) C() <-chan time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "C") + ret0, _ := ret[0].(<-chan time.Time) + return ret0 +} + +// C indicates an expected call of C. +func (mr *MockWatchdogMockRecorder) C() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "C", reflect.TypeOf((*MockWatchdog)(nil).C)) +} diff --git a/contract/pkg/mock/subscription_WatchdogFactory.go b/contract/pkg/mock/subscription_WatchdogFactory.go new file mode 100644 index 0000000..9523b0d --- /dev/null +++ b/contract/pkg/mock/subscription_WatchdogFactory.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription (interfaces: WatchdogFactory) + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + time "time" + + subscription "github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription" + gomock "github.com/golang/mock/gomock" +) + +// MockWatchdogFactory is a mock of WatchdogFactory interface. +type MockWatchdogFactory struct { + ctrl *gomock.Controller + recorder *MockWatchdogFactoryMockRecorder +} + +// MockWatchdogFactoryMockRecorder is the mock recorder for MockWatchdogFactory. +type MockWatchdogFactoryMockRecorder struct { + mock *MockWatchdogFactory +} + +// NewMockWatchdogFactory creates a new mock instance. +func NewMockWatchdogFactory(ctrl *gomock.Controller) *MockWatchdogFactory { + mock := &MockWatchdogFactory{ctrl: ctrl} + mock.recorder = &MockWatchdogFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockWatchdogFactory) EXPECT() *MockWatchdogFactoryMockRecorder { + return m.recorder +} + +// NewWatchdog mocks base method. +func (m *MockWatchdogFactory) NewWatchdog(arg0 time.Duration) subscription.Watchdog { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewWatchdog", arg0) + ret0, _ := ret[0].(subscription.Watchdog) + return ret0 +} + +// NewWatchdog indicates an expected call of NewWatchdog. +func (mr *MockWatchdogFactoryMockRecorder) NewWatchdog(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewWatchdog", reflect.TypeOf((*MockWatchdogFactory)(nil).NewWatchdog), arg0) +} diff --git a/contract/pkg/subscription/event_decorer.go b/contract/pkg/subscription/event_decorer.go new file mode 100644 index 0000000..608b44e --- /dev/null +++ b/contract/pkg/subscription/event_decorer.go @@ -0,0 +1,27 @@ +package subscription + +import "github.com/centrifuge/go-substrate-rpc-client/v4/types" + +//go:generate mockgen -destination ../mock/subscription_EventDecoder.go -package mock github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription EventDecoder + +type EventDecoder interface { + DecodeEvents(data types.StorageDataRaw, m *types.Metadata) (*types.EventRecords, error) +} + +func NewEventDecoder() EventDecoder { + return &eventDecoder{} +} + +var _ EventDecoder = &eventDecoder{} + +type eventDecoder struct { +} + +func (e *eventDecoder) DecodeEvents(data types.StorageDataRaw, m *types.Metadata) (*types.EventRecords, error) { + events := types.EventRecords{} + err := types.EventRecordsRaw(data).DecodeEventRecords(m, &events) + if err != nil { + return nil, err + } + return &events, nil +} diff --git a/contract/pkg/subscription/subscription.go b/contract/pkg/subscription/subscription.go new file mode 100644 index 0000000..cc87e80 --- /dev/null +++ b/contract/pkg/subscription/subscription.go @@ -0,0 +1,49 @@ +package subscription + +import ( + "github.com/centrifuge/go-substrate-rpc-client/v4/rpc/state" + "github.com/centrifuge/go-substrate-rpc-client/v4/types" +) + +//go:generate mockgen -destination ../mock/subscription_ChainSubscription.go -package mock github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription ChainSubscription +//go:generate mockgen -destination ../mock/subscription_ChainSubscriptionFactory.go -package mock github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription ChainSubscriptionFactory + +type ChainSubscription interface { + Chan() <-chan types.StorageChangeSet + Err() <-chan error + Unsubscribe() +} + +type ChainSubscriptionFactory interface { + NewChainSubscription(sub *state.StorageSubscription) ChainSubscription +} + +var _ ChainSubscriptionFactory = &chainSubscriptionFactory{} + +type chainSubscriptionFactory struct { +} + +func NewChainFactory() ChainSubscriptionFactory { + return &chainSubscriptionFactory{} +} +func (p *chainSubscriptionFactory) NewChainSubscription(sub *state.StorageSubscription) ChainSubscription { + return &chainSubscription{sub} +} + +var _ ChainSubscription = &chainSubscription{} + +type chainSubscription struct { + sub *state.StorageSubscription +} + +func (c *chainSubscription) Chan() <-chan types.StorageChangeSet { + return c.sub.Chan() +} + +func (c *chainSubscription) Err() <-chan error { + return c.sub.Err() +} + +func (c *chainSubscription) Unsubscribe() { + c.sub.Unsubscribe() +} diff --git a/contract/pkg/subscription/watchdog.go b/contract/pkg/subscription/watchdog.go new file mode 100644 index 0000000..b7d176f --- /dev/null +++ b/contract/pkg/subscription/watchdog.go @@ -0,0 +1,37 @@ +package subscription + +import "time" + +//go:generate mockgen -destination=../mock/subscription_WatchdogFactory.go -package=mock github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription WatchdogFactory +//go:generate mockgen -destination=../mock/subscription_Watchdog.go -package=mock github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription Watchdog + +type Watchdog interface { + C() <-chan time.Time +} + +var _ Watchdog = &watchdog{} + +type watchdog struct { + *time.Ticker +} + +func (w *watchdog) C() <-chan time.Time { + return w.Ticker.C +} + +type WatchdogFactory interface { + NewWatchdog(timeout time.Duration) Watchdog +} + +type watchdogFactory struct { +} + +func (w *watchdogFactory) NewWatchdog(timeout time.Duration) Watchdog { + return &watchdog{ + Ticker: time.NewTicker(timeout), + } +} + +func NewWatchdogFactory() WatchdogFactory { + return &watchdogFactory{} +} diff --git a/ddc-schemas b/ddc-schemas index 3e4f5c7..b4d9841 160000 --- a/ddc-schemas +++ b/ddc-schemas @@ -1 +1 @@ -Subproject commit 3e4f5c709554972de98f2e3b9051090630d10c3f +Subproject commit b4d98417f1c10b0c4f9a141e34026401a045d266 diff --git a/go.work.sum b/go.work.sum index 204f7ae..55509d9 100644 --- a/go.work.sum +++ b/go.work.sum @@ -135,7 +135,6 @@ github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= -github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= github.com/google/certificate-transparency-go v1.0.21/go.mod h1:QeJfpSbVSfYc7RgB3gJFj9cbuQMMchQxrWXz8Ruopmg= @@ -217,11 +216,14 @@ go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk= go.opentelemetry.io/otel/exporters/jaeger v1.4.1/go.mod h1:ZW7vkOu9nC1CxsD8bHNHCia5JUbwP39vxgd1q4Z5rCI= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=