From 2328dd9ecf1c8f694ff61369675ea9c6b68c2df1 Mon Sep 17 00:00:00 2001 From: Mikhail Kornilov Date: Wed, 5 Aug 2020 20:53:49 +0300 Subject: [PATCH] Release: [DFI-594] VM retry (#186) * [DFI-575] LCS viewer (#182) * [DFI-575] VM mod: LCS viewer prototype added * [DFI-575] CLI output fix * [DFI-575] CLI output fix * [DFI-575] ModulePath support added * [DFI-575] Docs added * [DFI-575] REST API added + REST test; Cosmos SDK Swagger dependency fix * [DFI-594] vm mod: retry mechanism refactoring (option to disable request timeout) * [DFI-594] Linter fix * [DFI-594] Sleep added (useful for tests only) --- app/app.go | 2 +- app/common_test.go | 10 +- app/integ_gov_test.go | 2 +- app/integ_linux_test.go | 2 +- app/integ_test.go | 76 +-------- cmd/config/config.go | 25 ++- cmd/config/vm_config_template.go | 16 +- cmd/dncli/docs/swagger.go | 145 +++++++++++++++++- helpers/tests/clitester/cli_tester_configs.go | 10 +- helpers/tests/clitester/cli_tester_init.go | 3 +- helpers/tests/clitester/cli_tester_options.go | 5 +- helpers/tests/mockdvm/mockdvm.go | 85 ++++++++++ x/oracle/internal/types/price_test.go | 2 + x/vm/internal/keeper/common_test.go | 10 +- x/vm/internal/keeper/keeper_ds.go | 103 ++++++------- x/vm/internal/keeper/keeper_ds_test.go | 145 ++++++++++++++++++ 16 files changed, 456 insertions(+), 185 deletions(-) create mode 100644 helpers/tests/mockdvm/mockdvm.go create mode 100644 x/vm/internal/keeper/keeper_ds_test.go diff --git a/app/app.go b/app/app.go index c9a6b79a..92bf8e51 100644 --- a/app/app.go +++ b/app/app.go @@ -263,7 +263,7 @@ func NewDnServiceApp(logger log.Logger, db dbm.DB, config *config.VMConfig, invC ) // Upgrade handler with name matching proposal name should be registered here. - //app.upgradeKeeper.SetUpgradeHandler("My_update", func(ctx sdk.Context, plan upgrade.Plan) { }) + app.upgradeKeeper.SetUpgradeHandler("v0.6.1", func(ctx sdk.Context, plan upgrade.Plan) {}) // VMKeeper stores VM resources and interacts with DVM. app.vmKeeper = vm.NewKeeper( diff --git a/app/common_test.go b/app/common_test.go index 07d0ed8d..064cd364 100644 --- a/app/common_test.go +++ b/app/common_test.go @@ -236,12 +236,10 @@ func NewTestDnAppDVM(t *testing.T, logOpts ...log.Option) (*DnServiceApp, string // create VM config config := &vmConfig.VMConfig{ - Address: dvmAddr, - DataListen: dsAddr, - MaxAttempts: 10, - InitialBackoff: 500, - MaxBackoff: 1500, - BackoffMultiplier: 0.1, + Address: dvmAddr, + DataListen: dsAddr, + MaxAttempts: 10, + ReqTimeoutInMs: 1000, } // create app diff --git a/app/integ_gov_test.go b/app/integ_gov_test.go index 99bea77c..6a9d38e9 100644 --- a/app/integ_gov_test.go +++ b/app/integ_gov_test.go @@ -63,7 +63,7 @@ func TestIntegGov_StdlibUpdate(t *testing.T) { t, true, cliTester.DaemonLogLevelOption("x/vm/dsserver:info,x/vm:info,x/gov:info,main:info,state:info,*:error"), - cliTester.VMCommunicationOption(50, 1000, 100), + cliTester.VMCommunicationOption(5, 1000), cliTester.VMCommunicationBaseAddressNetOption("tcp://127.0.0.1"), ) defer ct.Close() diff --git a/app/integ_linux_test.go b/app/integ_linux_test.go index 2c646470..7c98605a 100644 --- a/app/integ_linux_test.go +++ b/app/integ_linux_test.go @@ -35,7 +35,7 @@ func TestIntegVM_CommunicationUDSOverDocker(t *testing.T) { ct := cliTester.New( t, false, - cliTester.VMCommunicationOption(50, 1000, 100), + cliTester.VMCommunicationOption(5, 1000), cliTester.VMCommunicationBaseAddressUDSOption(dsSocket, dvmSocket), ) defer ct.Close() diff --git a/app/integ_test.go b/app/integ_test.go index 78eeb9a7..723b3d2f 100644 --- a/app/integ_test.go +++ b/app/integ_test.go @@ -3,10 +3,8 @@ package app import ( - "context" "encoding/hex" "io/ioutil" - "net" "os" "path" "strings" @@ -15,75 +13,17 @@ import ( "time" "github.com/cosmos/cosmos-sdk/server" - "github.com/dfinance/dvm-proto/go/vm_grpc" "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - grpcStatus "google.golang.org/grpc/status" "github.com/dfinance/dnode/helpers" "github.com/dfinance/dnode/helpers/tests" cliTester "github.com/dfinance/dnode/helpers/tests/clitester" + "github.com/dfinance/dnode/helpers/tests/mockdvm" testUtils "github.com/dfinance/dnode/helpers/tests/utils" "github.com/dfinance/dnode/x/vm" "github.com/dfinance/dnode/x/vm/client/vm_client" ) -type MockDVM struct { - server *grpc.Server - failExecution bool - failResponse bool - execDelay time.Duration -} - -func (s *MockDVM) SetExecutionFail() { s.failExecution = true } -func (s *MockDVM) SetExecutionOK() { s.failExecution = false } -func (s *MockDVM) SetResponseFail() { s.failResponse = true } -func (s *MockDVM) SetResponseOK() { s.failResponse = false } -func (s *MockDVM) SetExecutionDelay(dur time.Duration) { - s.execDelay = dur -} -func (s *MockDVM) Stop() { - if s.server != nil { - s.server.Stop() - } -} - -func (s *MockDVM) PublishModule(ctx context.Context, in *vm_grpc.VMPublishModule) (*vm_grpc.VMExecuteResponse, error) { - if s.failExecution { - return nil, grpcStatus.Errorf(codes.Internal, "failing gRPC execution") - } - - resp := &vm_grpc.VMExecuteResponse{} - if !s.failResponse { - resp = &vm_grpc.VMExecuteResponse{ - WriteSet: nil, - Events: nil, - GasUsed: 1, - Status: vm_grpc.ContractStatus_Discard, - StatusStruct: nil, - } - } - - return resp, nil -} - -func StartMockDVMService(listener net.Listener) *MockDVM { - s := &MockDVM{ - execDelay: 100 * time.Millisecond, - } - - server := grpc.NewServer() - vm_grpc.RegisterVMModulePublisherServer(server, s) - - go func() { - server.Serve(listener) - }() - s.server = server - - return s -} - // Test dnode crash on VM Tx failure func TestInteg_ConsensusFailure(t *testing.T) { const script = ` @@ -167,7 +107,7 @@ func TestIntegVM_ExecuteScriptViaCLI(t *testing.T) { ct := cliTester.New( t, false, - cliTester.VMCommunicationOption(50, 1000, 100), + cliTester.VMCommunicationOption(5, 1000), cliTester.VMCommunicationBaseAddressNetOption("tcp://127.0.0.1"), ) defer ct.Close() @@ -254,7 +194,7 @@ func TestIntegVM_ExecuteScriptViaREST(t *testing.T) { ct := cliTester.New( t, false, - cliTester.VMCommunicationOption(50, 1000, 100), + cliTester.VMCommunicationOption(5, 1000), cliTester.VMCommunicationBaseAddressNetOption("tcp://127.0.0.1"), ) defer ct.Close() @@ -358,7 +298,7 @@ func TestIntegVM_DeployModuleViaCLI(t *testing.T) { ct := cliTester.New( t, false, - cliTester.VMCommunicationOption(50, 1000, 100), + cliTester.VMCommunicationOption(5, 1000), cliTester.VMCommunicationBaseAddressNetOption("tcp://127.0.0.1"), ) defer ct.Close() @@ -417,7 +357,7 @@ func TestIntegVM_DeployModuleViaREST(t *testing.T) { ct := cliTester.New( t, false, - cliTester.VMCommunicationOption(50, 1000, 100), + cliTester.VMCommunicationOption(5, 1000), cliTester.VMCommunicationBaseAddressNetOption("tcp://127.0.0.1"), ) defer ct.Close() @@ -486,7 +426,7 @@ func TestIntegVM_RequestRetry(t *testing.T) { ct := cliTester.New( t, true, - cliTester.VMCommunicationOption(100, 500, 10), + cliTester.VMCommunicationOption(5, 100), cliTester.VMCommunicationBaseAddressUDSOption(dsSocket, mockDVMSocket), ) defer ct.Close() @@ -496,7 +436,7 @@ func TestIntegVM_RequestRetry(t *testing.T) { mockDVMListener, err := helpers.GetGRpcNetListener("unix://" + mockDVMSocketPath) require.NoError(t, err, "creating MockDVM listener") - mockDvm := StartMockDVMService(mockDVMListener) + mockDvm := mockdvm.StartMockDVMService(mockDVMListener) defer mockDvm.Stop() require.NoError(t, testUtils.WaitForFileExists(mockDVMSocketPath, 1*time.Second), "MockDVM start failed") @@ -580,7 +520,7 @@ func TestIntegVM_CommunicationUDS(t *testing.T) { ct := cliTester.New( t, false, - cliTester.VMCommunicationOption(50, 1000, 100), + cliTester.VMCommunicationOption(5, 1000), cliTester.VMCommunicationBaseAddressUDSOption(dsSocket, dvmSocket), ) defer ct.Close() diff --git a/cmd/config/config.go b/cmd/config/config.go index c0c35e52..98de9153 100644 --- a/cmd/config/config.go +++ b/cmd/config/config.go @@ -31,10 +31,8 @@ const ( DefaultDataListen = "tcp://127.0.0.1:50052" // Default data server address to listen for connections from VM. // Default retry configs. - DefaultMaxAttempts = 0 // Default VM retry attempts. - DefaultInitialBackoff = 1000 // Default VM 100 milliseconds for retry attempts. - DefaultMaxBackoff = 2000 // Default VM max backoff. - DefaultBackoffMultiplier = 0.1 // Default backoff multiplayer (10) + DefaultMaxAttempts = 0 // Default maximum attempts for retry. + DefaultReqTimeout = 0 // Default request timeout per attempt [ms]. // Default governance params. DefaultGovMinDepositAmount = "100000000000000000000" // 100 dfi @@ -52,23 +50,18 @@ type VMConfig struct { Address string `mapstructure:"vm_address"` // address of virtual machine. DataListen string `mapstructure:"vm_data_listen"` // data listen. - // Retry policy. - // Example how backoff works - https://stackoverflow.com/questions/43224683/what-does-backoffmultiplier-mean-in-defaultretrypolicy. - MaxAttempts int `mapstructure:"vm_retry_max_attempts"` // maximum attempts for retry, for infinity retry - use 0. - InitialBackoff int `mapstructure:"vm_retry_initial_backoff"` // initial back off in ms. - MaxBackoff int `mapstructure:"vm_retry_max_backoff"` // max backoff in ms. - BackoffMultiplier float64 `mapstructure:"vm_retry_backoff_multiplier"` // backoff multiplier. + // Retry policy + MaxAttempts uint `mapstructure:"vm_retry_max_attempts"` // maximum attempts for retry (0 - infinity) + ReqTimeoutInMs uint `mapstructure:"vm_retry_req_timeout_ms"` // request timeout per attempt (0 - infinity) [ms] } // Default VM configuration. func DefaultVMConfig() *VMConfig { return &VMConfig{ - Address: DefaultVMAddress, - DataListen: DefaultDataListen, - MaxAttempts: DefaultMaxAttempts, - InitialBackoff: DefaultInitialBackoff, - MaxBackoff: DefaultMaxBackoff, - BackoffMultiplier: DefaultBackoffMultiplier, + Address: DefaultVMAddress, + DataListen: DefaultDataListen, + MaxAttempts: DefaultMaxAttempts, + ReqTimeoutInMs: DefaultReqTimeout, } } diff --git a/cmd/config/vm_config_template.go b/cmd/config/vm_config_template.go index 4bb5e75f..d2b6e467 100644 --- a/cmd/config/vm_config_template.go +++ b/cmd/config/vm_config_template.go @@ -29,18 +29,10 @@ vm_data_listen = "{{ .DataListen }}" # VM retry settings. ## Retry max attempts. -## Default is 0 - infinity attempts, -1 - to disable. +## Default is 0 - infinity attempts. vm_retry_max_attempts = {{ .MaxAttempts }} -## Initial backoff in ms. -## Default is 100ms. -vm_retry_initial_backoff = {{ .InitialBackoff }} - -## Max backoff in ms. -## Default is 150ms. -vm_retry_max_backoff = {{ .MaxBackoff }} - -## Backoff multiplier. -## Default is 0.1 -vm_retry_backoff_multiplier = {{ .BackoffMultiplier }} +## Request timeout per attempt in ms. +## Default is 0 - infinite (no timeout). +vm_retry_req_timeout_ms = {{ .ReqTimeoutInMs }} ` diff --git a/cmd/dncli/docs/swagger.go b/cmd/dncli/docs/swagger.go index 5828c9dd..f0cc6454 100644 --- a/cmd/dncli/docs/swagger.go +++ b/cmd/dncli/docs/swagger.go @@ -752,6 +752,28 @@ definitions: format: HEX encoded byte code type: string type: object + rest.LcsViewReq: + properties: + address: + description: Resource address + example: "0x0000000000000000000000000000000000000001" + format: bech32/hex + type: string + move_path: + description: Move formatted path (ModuleName::StructName, where ::StructName is optional) + example: Block::BlockMetadata + type: string + view_request: + description: LCS view JSON formatted request (refer to docs for specs) + example: '[ { "name": "height", "type": "U64" } ]' + type: string + type: object + rest.LcsViewResp: + properties: + value: + format: JSON + type: string + type: object rest.MSRespGetCall: properties: height: @@ -998,6 +1020,14 @@ definitions: $ref: '#/definitions/vm_client.MoveFile' type: object type: object + rest.VmRespLcsView: + properties: + height: + type: integer + result: + $ref: '#/definitions/rest.LcsViewResp' + type: object + type: object rest.VmRespStdTx: properties: height: @@ -1034,8 +1064,6 @@ definitions: items: type: integer type: array - types.Address: - type: object types.Asset: properties: active: @@ -1134,6 +1162,24 @@ definitions: items: $ref: '#/definitions/types.Coin' type: array + types.Commission: + properties: + max_change_rate: + $ref: '#/definitions/types.Dec' + description: maximum daily increase of the validator commission, as a fraction + type: object + max_rate: + $ref: '#/definitions/types.Dec' + description: maximum commission rate which validator can ever charge, as a fraction + type: object + rate: + $ref: '#/definitions/types.Dec' + description: the commission rate charged to delegators, as a fraction + type: object + update_time: + description: the last time the commission rate was changed + type: string + type: object types.Currencies: items: $ref: '#/definitions/types.Currency' @@ -1189,6 +1235,24 @@ definitions: items: $ref: '#/definitions/types.DecCoin' type: array + types.Description: + properties: + details: + description: optional details + type: string + identity: + description: optional identity signature (ex. UPort or Keybase) + type: string + moniker: + description: name + type: string + security_contact: + description: optional security contact info + type: string + website: + description: optional website link + type: string + type: object types.ID: $ref: '#/definitions/sdk.Uint' types.Int: @@ -1413,18 +1477,52 @@ definitions: items: $ref: '#/definitions/types.VMStatus' type: array + types.ValAddress: + items: + type: integer + type: array types.Validator: properties: - address: - $ref: '#/definitions/types.Address' + commission: + $ref: '#/definitions/types.Commission' + description: commission parameters type: object - proposer_priority: - type: integer - pub_key: + consensus_pubkey: $ref: '#/definitions/crypto.PubKey' + description: the consensus public key of the validator; bech encoded in JSON type: object - voting_power: + delegator_shares: + $ref: '#/definitions/types.Dec' + description: total shares issued to a validator's delegators + type: object + description: + $ref: '#/definitions/types.Description' + description: description terms for the validator + type: object + jailed: + description: has the validator been jailed from bonded status? + type: boolean + min_self_delegation: + $ref: '#/definitions/types.Int' + description: validator's self declared minimum self delegation + type: object + operator_address: + $ref: '#/definitions/types.ValAddress' + description: address of the validator's operator; bech encoded in JSON + type: object + status: + description: validator status (bonded/unbonding/unbonded) + type: string + tokens: + $ref: '#/definitions/types.Int' + description: delegated tokens (incl. self-delegation) + type: object + unbonding_height: + description: if unbonding, height at which this validator has begun unbonding type: integer + unbonding_time: + description: if unbonding, min time for the validator to complete unbonding + type: string type: object types.Validators: items: @@ -4218,6 +4316,37 @@ paths: summary: Get TX VM execution status tags: - VM + /vm/view: + get: + consumes: + - application/json + description: Get writeSet data LCS string view for {address}::{moduleName}::{structName} Move path" + operationId: vmGetData + parameters: + - description: View request + in: body + name: request + required: true + schema: + $ref: '#/definitions/rest.LcsViewReq' + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/rest.VmRespLcsView' + "400": + description: Returned if the request doesn't have valid query params + schema: + $ref: '#/definitions/rest.ErrorResponse' + "500": + description: Returned on server error + schema: + $ref: '#/definitions/rest.ErrorResponse' + summary: Get writeSet data from VM LCS string view + tags: + - VM schemes: - https securityDefinitions: diff --git a/helpers/tests/clitester/cli_tester_configs.go b/helpers/tests/clitester/cli_tester_configs.go index c6c674f7..72b1bb83 100644 --- a/helpers/tests/clitester/cli_tester_configs.go +++ b/helpers/tests/clitester/cli_tester_configs.go @@ -273,16 +273,14 @@ func NewTestVMConnectionConfigTCP() (c VMConnectionConfig, retErr error) { } type VMCommunicationConfig struct { - MinBackoffMs int - MaxBackoffMs int - MaxAttempts int + MaxAttempts uint + ReqTimeoutInMs uint } func NewTestVMCommunicationConfig() VMCommunicationConfig { return VMCommunicationConfig{ - MinBackoffMs: 100, - MaxBackoffMs: 150, - MaxAttempts: 1, + MaxAttempts: 1, + ReqTimeoutInMs: 5000, } } diff --git a/helpers/tests/clitester/cli_tester_init.go b/helpers/tests/clitester/cli_tester_init.go index 308f17ae..386162bd 100644 --- a/helpers/tests/clitester/cli_tester_init.go +++ b/helpers/tests/clitester/cli_tester_init.go @@ -230,9 +230,8 @@ func (ct *CLITester) initChain() { { vmConfig := dnConfig.DefaultVMConfig() vmConfig.Address, vmConfig.DataListen = ct.VMConnection.ConnectAddress, ct.VMConnection.ListenAddress - vmConfig.InitialBackoff = ct.VMCommunication.MinBackoffMs - vmConfig.MaxBackoff = ct.VMCommunication.MaxBackoffMs vmConfig.MaxAttempts = ct.VMCommunication.MaxAttempts + vmConfig.ReqTimeoutInMs = ct.VMCommunication.ReqTimeoutInMs dnConfig.WriteVMConfig(ct.Dirs.RootDir, vmConfig) } } diff --git a/helpers/tests/clitester/cli_tester_options.go b/helpers/tests/clitester/cli_tester_options.go index 9a52f6d8..a40e6000 100644 --- a/helpers/tests/clitester/cli_tester_options.go +++ b/helpers/tests/clitester/cli_tester_options.go @@ -51,11 +51,10 @@ func BinaryPathsOptions(dnodePath, dncliPath string) CLITesterOption { } } -func VMCommunicationOption(minBackoffMs, maxBackoffMs, maxAttempts int) CLITesterOption { +func VMCommunicationOption(maxAttempts, reqTimeoutInMs uint) CLITesterOption { return func(ct *CLITester) error { - ct.VMCommunication.MinBackoffMs = minBackoffMs - ct.VMCommunication.MaxBackoffMs = maxBackoffMs ct.VMCommunication.MaxAttempts = maxAttempts + ct.VMCommunication.ReqTimeoutInMs = reqTimeoutInMs return nil } diff --git a/helpers/tests/mockdvm/mockdvm.go b/helpers/tests/mockdvm/mockdvm.go new file mode 100644 index 00000000..920d4859 --- /dev/null +++ b/helpers/tests/mockdvm/mockdvm.go @@ -0,0 +1,85 @@ +package mockdvm + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + "github.com/dfinance/dvm-proto/go/vm_grpc" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + grpcStatus "google.golang.org/grpc/status" +) + +type MockDVM struct { + sync.Mutex + server *grpc.Server + failExecution bool + failResponse bool + failCountdown uint + execDelay time.Duration +} + +func (s *MockDVM) SetExecutionFail() { s.failExecution = true } +func (s *MockDVM) SetExecutionOK() { s.failExecution = false } +func (s *MockDVM) SetResponseFail() { s.failResponse = true } +func (s *MockDVM) SetResponseOK() { s.failResponse = false } +func (s *MockDVM) SetExecutionDelay(dur time.Duration) { + s.execDelay = dur +} +func (s *MockDVM) SetSequentialFailingCount(cnt uint) { + s.failCountdown = cnt +} +func (s *MockDVM) Stop() { + if s.server != nil { + s.server.Stop() + } +} + +func (s *MockDVM) PublishModule(ctx context.Context, in *vm_grpc.VMPublishModule) (*vm_grpc.VMExecuteResponse, error) { + s.Lock() + defer s.Unlock() + + time.Sleep(s.execDelay) + + if s.failExecution || s.failCountdown > 0 { + if s.failCountdown > 0 { + s.failCountdown-- + } + + return nil, grpcStatus.Errorf(codes.Internal, "failing gRPC execution") + } + + resp := &vm_grpc.VMExecuteResponse{} + if !s.failResponse { + resp = &vm_grpc.VMExecuteResponse{ + WriteSet: nil, + Events: nil, + GasUsed: 1, + Status: vm_grpc.ContractStatus_Discard, + StatusStruct: nil, + } + } + + return resp, nil +} + +func StartMockDVMService(listener net.Listener) *MockDVM { + s := &MockDVM{ + execDelay: 100 * time.Millisecond, + } + + server := grpc.NewServer() + vm_grpc.RegisterVMModulePublisherServer(server, s) + + go func() { + if err := server.Serve(listener); err != nil { + fmt.Printf("MockDVM serve: %v\n", err) + } + }() + s.server = server + + return s +} diff --git a/x/oracle/internal/types/price_test.go b/x/oracle/internal/types/price_test.go index 4b74f385..01b8b083 100644 --- a/x/oracle/internal/types/price_test.go +++ b/x/oracle/internal/types/price_test.go @@ -1,3 +1,5 @@ +// +build unit + package types import ( diff --git a/x/vm/internal/keeper/common_test.go b/x/vm/internal/keeper/common_test.go index 6ff8bb33..e42b8e90 100644 --- a/x/vm/internal/keeper/common_test.go +++ b/x/vm/internal/keeper/common_test.go @@ -343,12 +343,10 @@ func newTestInput(launchMock bool) testInput { func newMockVMConfig() *vmConfig.VMConfig { return &vmConfig.VMConfig{ - Address: *vmMockAddress, - DataListen: *dataListenMock, - MaxAttempts: vmConfig.DefaultMaxAttempts, - InitialBackoff: vmConfig.DefaultInitialBackoff, - MaxBackoff: vmConfig.DefaultMaxBackoff, - BackoffMultiplier: vmConfig.DefaultBackoffMultiplier, + Address: *vmMockAddress, + DataListen: *dataListenMock, + MaxAttempts: vmConfig.DefaultMaxAttempts, + ReqTimeoutInMs: vmConfig.DefaultReqTimeout, } } diff --git a/x/vm/internal/keeper/keeper_ds.go b/x/vm/internal/keeper/keeper_ds.go index fbfa42db..92e02589 100644 --- a/x/vm/internal/keeper/keeper_ds.go +++ b/x/vm/internal/keeper/keeper_ds.go @@ -3,7 +3,6 @@ package keeper import ( "context" "fmt" - "math" "time" sdk "github.com/cosmos/cosmos-sdk/types" @@ -14,11 +13,14 @@ import ( // RetryExecReq contains VM "execution" request meta (request details and retry settings). type RetryExecReq struct { - RawModule *vm_grpc.VMPublishModule // Request to retry (module publish). - RawScript *vm_grpc.VMExecuteScript // Request to retry (script execution) - Attempt int // Current attempt. - CurrentTimeout int // Current timeout. - MaxAttempts int // Max attempts. + // Request to retry (module publish). + RawModule *vm_grpc.VMPublishModule + // Request to retry (script execution) + RawScript *vm_grpc.VMExecuteScript + // Max number of request attempts (0 - infinite) + MaxAttempts uint + // Request timeout per attempt (0 - infinite) [ms] + ReqTimeoutInMs uint } // StartDSServer starts DataSource server. @@ -58,69 +60,65 @@ func (k Keeper) CloseConnections() { // Contract: either RawModule or RawScript must be specified for RetryExecReq. func (k Keeper) retryExecReq(ctx sdk.Context, req RetryExecReq) (retResp *vm_grpc.VMExecuteResponse, retErr error) { doneCh := make(chan bool) - go func() { - var cancelCtx func() - stopPrevCtx := func() { - if cancelCtx != nil { - cancelCtx() - } - } + curAttempt := uint(0) + reqTimeout := time.Duration(req.ReqTimeoutInMs) * time.Millisecond + reqStartedAt := time.Now() + go func() { defer func() { close(doneCh) - stopPrevCtx() }() for { - stopPrevCtx() - curTimeout := time.Duration(req.CurrentTimeout) * time.Millisecond - connCtx, cancelFunc := context.WithTimeout(context.Background(), curTimeout) - cancelCtx = cancelFunc - - connStartedAt := time.Now() + var connCtx context.Context + var connCancel context.CancelFunc var resp *vm_grpc.VMExecuteResponse var err error + + curAttempt++ + + connCtx = context.Background() + if reqTimeout > 0 { + connCtx, connCancel = context.WithTimeout(context.Background(), reqTimeout) + } + + curReqStartedAt := time.Now() if req.RawModule != nil { resp, err = k.client.PublishModule(connCtx, req.RawModule) } else if req.RawScript != nil { resp, err = k.client.ExecuteScript(connCtx, req.RawScript) } + if connCancel != nil { + connCancel() + } + curReqDur := time.Since(curReqStartedAt) + + if err == nil { + retResp, retErr = resp, nil + return + } - connDuration := time.Since(connStartedAt) - if err != nil { - if req.Attempt == 0 { - // write to Sentry (if enabled) - k.GetLogger(ctx).Error(fmt.Sprintf("Can't get answer from VM in %v, will try to reconnect in %s attempts: %v", req.CurrentTimeout, getMaxAttemptsStr(req.MaxAttempts), err)) - } - req.Attempt += 1 - - if req.MaxAttempts != 0 && req.Attempt == req.MaxAttempts { - // return error because of max attempts. - logErr := fmt.Errorf("max %d attemps reached, can't get answer from VM: %v", req.Attempt, err) - k.GetLogger(ctx).Error(logErr.Error()) - retErr = logErr - return - } - - if curTimeout > connDuration { - time.Sleep(curTimeout - connDuration) - } - - req.CurrentTimeout += int(math.Round(float64(req.CurrentTimeout) * k.config.BackoffMultiplier)) - if req.CurrentTimeout > k.config.MaxBackoff { - req.CurrentTimeout = k.config.MaxBackoff - } - - continue + if curAttempt == req.MaxAttempts { + retResp, retErr = nil, err + return } - k.GetLogger(ctx).Info(fmt.Sprintf("Successfully connected to VM with %v timeout in %d attempts", req.CurrentTimeout, req.Attempt)) - retResp = resp - return + if curReqDur < reqTimeout { + time.Sleep(reqTimeout - curReqDur) + } } }() <-doneCh + reqDur := time.Since(reqStartedAt) + msg := fmt.Sprintf("in %d attempt(s) with %v timeout (%v)", curAttempt, reqTimeout, reqDur) + if retErr == nil { + k.GetLogger(ctx).Info(fmt.Sprintf("Successfull VM request (%s)", msg)) + } else { + k.GetLogger(ctx).Error(fmt.Sprintf("Failed VM request (%s): %v", msg, retErr)) + retErr = fmt.Errorf("%s: %w", msg, retErr) + } + return } @@ -136,13 +134,8 @@ func (k Keeper) sendExecuteReq(ctx sdk.Context, moduleReq *vm_grpc.VMPublishModu retryReq := RetryExecReq{ RawModule: moduleReq, RawScript: scriptReq, - CurrentTimeout: k.config.InitialBackoff, MaxAttempts: k.config.MaxAttempts, - } - - if k.config.MaxAttempts < 0 { - // just send, in case of error - return error and panic. - retryReq.MaxAttempts = 1 + ReqTimeoutInMs: k.config.ReqTimeoutInMs, } return k.retryExecReq(ctx, retryReq) diff --git a/x/vm/internal/keeper/keeper_ds_test.go b/x/vm/internal/keeper/keeper_ds_test.go new file mode 100644 index 00000000..cd057622 --- /dev/null +++ b/x/vm/internal/keeper/keeper_ds_test.go @@ -0,0 +1,145 @@ +// +build integ + +package keeper + +import ( + "testing" + "time" + + "github.com/cosmos/cosmos-sdk/server" + "github.com/stretchr/testify/require" + + "github.com/dfinance/dnode/helpers" + "github.com/dfinance/dnode/helpers/tests/mockdvm" + "github.com/dfinance/dnode/x/common_vm" + "github.com/dfinance/dnode/x/vm/internal/types" +) + +func TestVMKeeper_RetryMechanism(t *testing.T) { + t.Parallel() + + input := newTestInput(true) + defer input.Stop() + ctx, keeper := input.ctx, input.vk + + // start mockDVM gRPC server (module publisher) + listenerAddr, _, err := server.FreeTCPAddr() + require.NoError(t, err, "geting free TCP port for MockDVM listener") + + mockDvmListener, err := helpers.GetGRpcNetListener(listenerAddr) + require.NoError(t, err, "creating MockDVM listener") + + mockDvmServer := mockdvm.StartMockDVMService(mockDvmListener) + defer mockDvmServer.Stop() + + // create mockDVM gRPC client and rewrite test keeper's one + mockDvmCLient, err := helpers.GetGRpcClientConnection(listenerAddr, 1*time.Second) + require.NoError(t, err, "creating MockDVM client") + keeper.rawClient = mockDvmCLient + keeper.client = NewVMClient(mockDvmCLient) + + deployReq, err := NewDeployRequest(ctx, types.MsgDeployModule{ + Signer: common_vm.StdLibAddress, + Module: []byte{0x01, 0x02, 0x03, 0x04, 0x05}, + }) + require.NoError(t, err, "creating deployRequest") + + // ok: in one attempt (infinite settings) + { + keeper.config.MaxAttempts, keeper.config.ReqTimeoutInMs = 0, 0 + + mockDvmServer.SetExecutionOK() + mockDvmServer.SetResponseOK() + mockDvmServer.SetExecutionDelay(50 * time.Millisecond) + + _, err := keeper.sendExecuteReq(ctx, deployReq, nil) + require.NoError(t, err) + } + + // ok: in one attempt (settings with limit) + { + keeper.config.MaxAttempts, keeper.config.ReqTimeoutInMs = 1, 100 + + mockDvmServer.SetExecutionOK() + mockDvmServer.SetResponseOK() + mockDvmServer.SetExecutionDelay(50 * time.Millisecond) + + _, err := keeper.sendExecuteReq(ctx, deployReq, nil) + require.NoError(t, err) + } + + // ok: in one attempt (without request timeout) + { + keeper.config.MaxAttempts, keeper.config.ReqTimeoutInMs = 1, 0 + + mockDvmServer.SetExecutionOK() + mockDvmServer.SetResponseOK() + mockDvmServer.SetExecutionDelay(500 * time.Millisecond) + + _, err := keeper.sendExecuteReq(ctx, deployReq, nil) + require.NoError(t, err) + } + + // ok: in multiple attempts (with request timeout) + { + keeper.config.MaxAttempts, keeper.config.ReqTimeoutInMs = 10, 200 + + mockDvmServer.SetExecutionOK() + mockDvmServer.SetResponseOK() + mockDvmServer.SetSequentialFailingCount(5) + mockDvmServer.SetExecutionDelay(100 * time.Millisecond) + + _, err := keeper.sendExecuteReq(ctx, deployReq, nil) + require.NoError(t, err) + } + + // ok: in multiple attempts (without request timeout) + { + keeper.config.MaxAttempts, keeper.config.ReqTimeoutInMs = 10, 0 + + mockDvmServer.SetExecutionOK() + mockDvmServer.SetResponseOK() + mockDvmServer.SetSequentialFailingCount(5) + mockDvmServer.SetExecutionDelay(100 * time.Millisecond) + + _, err := keeper.sendExecuteReq(ctx, deployReq, nil) + require.NoError(t, err) + } + + // ok: in one attempt with long response (without limits) + { + keeper.config.MaxAttempts, keeper.config.ReqTimeoutInMs = 0, 0 + + mockDvmServer.SetExecutionOK() + mockDvmServer.SetResponseOK() + mockDvmServer.SetExecutionDelay(3000 * time.Millisecond) + + _, err := keeper.sendExecuteReq(ctx, deployReq, nil) + require.NoError(t, err) + } + + // fail: by timeout (deadline) + { + keeper.config.MaxAttempts, keeper.config.ReqTimeoutInMs = 5, 100 + + mockDvmServer.SetExecutionOK() + mockDvmServer.SetResponseOK() + mockDvmServer.SetExecutionDelay(200 * time.Millisecond) + + _, err := keeper.sendExecuteReq(ctx, deployReq, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "context deadline exceeded") + } + + // fail: by attempts + { + keeper.config.MaxAttempts, keeper.config.ReqTimeoutInMs = 5, 100 + + mockDvmServer.SetExecutionFail() + mockDvmServer.SetExecutionDelay(50 * time.Millisecond) + + _, err := keeper.sendExecuteReq(ctx, deployReq, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "failing gRPC execution") + } +}