Skip to content

Commit

Permalink
Merge pull request #6192 from multiversx/system-vm-critical-section
Browse files Browse the repository at this point in the history
add system vm critical section
  • Loading branch information
AdoAdoAdo authored May 24, 2024
2 parents f7efd0f + a23d856 commit 2a6916f
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 24 deletions.
19 changes: 18 additions & 1 deletion epochStart/metachain/stakingDataProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,24 @@ func (sdp *stakingDataProvider) GetCurrentEpochValidatorStats() epochStart.Valid
sdp.mutStakingData.RLock()
defer sdp.mutStakingData.RUnlock()

return sdp.validatorStatsInEpoch
return copyValidatorStatsInEpoch(sdp.validatorStatsInEpoch)
}

func copyValidatorStatsInEpoch(oldInstance epochStart.ValidatorStatsInEpoch) epochStart.ValidatorStatsInEpoch {
return epochStart.ValidatorStatsInEpoch{
Eligible: copyMap(oldInstance.Eligible),
Waiting: copyMap(oldInstance.Waiting),
Leaving: copyMap(oldInstance.Leaving),
}
}

func copyMap(oldMap map[uint32]int) map[uint32]int {
newMap := make(map[uint32]int, len(oldMap))
for key, value := range oldMap {
newMap[key] = value
}

return newMap
}

// IsInterfaceNil return true if underlying object is nil
Expand Down
59 changes: 51 additions & 8 deletions epochStart/metachain/stakingDataProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/big"
"strings"
"sync"
"testing"

"github.com/multiversx/mx-chain-core-go/core"
Expand Down Expand Up @@ -465,16 +466,58 @@ func TestStakingDataProvider_PrepareStakingDataForRewards(t *testing.T) {
func TestStakingDataProvider_FillValidatorInfo(t *testing.T) {
t.Parallel()

owner := []byte("owner")
topUpVal := big.NewInt(828743)
basePrice := big.NewInt(100000)
stakeVal := big.NewInt(0).Add(topUpVal, basePrice)
numRunContractCalls := 0
t.Run("should work", func(t *testing.T) {
t.Parallel()

sdp := createStakingDataProviderWithMockArgs(t, owner, topUpVal, stakeVal, &numRunContractCalls)
owner := []byte("owner")
topUpVal := big.NewInt(828743)
basePrice := big.NewInt(100000)
stakeVal := big.NewInt(0).Add(topUpVal, basePrice)
numRunContractCalls := 0

err := sdp.FillValidatorInfo(&state.ValidatorInfo{PublicKey: []byte("bls key")})
require.NoError(t, err)
sdp := createStakingDataProviderWithMockArgs(t, owner, topUpVal, stakeVal, &numRunContractCalls)

err := sdp.FillValidatorInfo(&state.ValidatorInfo{PublicKey: []byte("bls key")})
require.NoError(t, err)
})
t.Run("concurrent calls should work", func(t *testing.T) {
t.Parallel()

owner := []byte("owner")
topUpVal := big.NewInt(828743)
basePrice := big.NewInt(100000)
stakeVal := big.NewInt(0).Add(topUpVal, basePrice)
numRunContractCalls := 0

sdp := createStakingDataProviderWithMockArgs(t, owner, topUpVal, stakeVal, &numRunContractCalls)

wg := sync.WaitGroup{}
numCalls := 100
wg.Add(numCalls)

require.NotPanics(t, func() {
for i := 0; i < numCalls; i++ {
go func(idx int) {
switch idx % 2 {
case 0:
err := sdp.FillValidatorInfo(&state.ValidatorInfo{
PublicKey: []byte("bls key"),
List: string(common.EligibleList),
ShardId: 0,
})
require.NoError(t, err)
case 1:
stats := sdp.GetCurrentEpochValidatorStats()
log.Info(fmt.Sprintf("%d", stats.Eligible[0]))
}

wg.Done()
}(i)
}

wg.Wait()
})
})
}

func TestCheckAndFillOwnerValidatorAuctionData(t *testing.T) {
Expand Down
19 changes: 13 additions & 6 deletions process/peer/validatorsProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ func (vp *validatorsProvider) GetLatestValidators() map[string]*validator.Valida
}

func (vp *validatorsProvider) updateCacheIfNeeded() {
vp.lock.RLock()
vp.lock.Lock()
defer vp.lock.Unlock()

shouldUpdate := time.Since(vp.lastCacheUpdate) > vp.cacheRefreshIntervalDuration
vp.lock.RUnlock()

if shouldUpdate {
vp.updateCache()
Expand Down Expand Up @@ -192,7 +193,10 @@ func (vp *validatorsProvider) epochStartEventHandler() nodesCoordinator.EpochSta

func (vp *validatorsProvider) startRefreshProcess(ctx context.Context) {
for {
vp.lock.Lock()
vp.updateCache()
vp.lock.Unlock()

select {
case epoch := <-vp.refreshCache:
vp.lock.Lock()
Expand All @@ -206,6 +210,7 @@ func (vp *validatorsProvider) startRefreshProcess(ctx context.Context) {
}
}

// this func should be called under mutex protection
func (vp *validatorsProvider) updateCache() {
lastFinalizedRootHash := vp.validatorStatistics.LastFinalizedRootHash()
if len(lastFinalizedRootHash) == 0 {
Expand All @@ -217,16 +222,12 @@ func (vp *validatorsProvider) updateCache() {
log.Trace("validatorsProvider - GetLatestValidatorInfos failed", "error", err)
}

vp.lock.RLock()
epoch := vp.currentEpoch
vp.lock.RUnlock()

newCache := vp.createNewCache(epoch, allNodes)

vp.lock.Lock()
vp.lastCacheUpdate = time.Now()
vp.cache = newCache
vp.lock.Unlock()
}

func (vp *validatorsProvider) createNewCache(
Expand Down Expand Up @@ -319,7 +320,13 @@ func shouldCombine(triePeerType common.PeerType, currentPeerType common.PeerType

// ForceUpdate will trigger the update process of all caches
func (vp *validatorsProvider) ForceUpdate() error {
vp.lock.Lock()
vp.updateCache()
vp.lock.Unlock()

vp.auctionMutex.Lock()
defer vp.auctionMutex.Unlock()

return vp.updateAuctionListCache()
}

Expand Down
13 changes: 5 additions & 8 deletions process/peer/validatorsProviderAuction.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ func (vp *validatorsProvider) GetAuctionList() ([]*common.AuctionListValidatorAP
}

func (vp *validatorsProvider) updateAuctionListCacheIfNeeded() error {
vp.auctionMutex.RLock()
vp.auctionMutex.Lock()
defer vp.auctionMutex.Unlock()

shouldUpdate := time.Since(vp.lastAuctionCacheUpdate) > vp.cacheRefreshIntervalDuration
vp.auctionMutex.RUnlock()

if shouldUpdate {
return vp.updateAuctionListCache()
Expand All @@ -38,6 +39,7 @@ func (vp *validatorsProvider) updateAuctionListCacheIfNeeded() error {
return nil
}

// this func should be called under mutex protection
func (vp *validatorsProvider) updateAuctionListCache() error {
rootHash := vp.validatorStatistics.LastFinalizedRootHash()
if len(rootHash) == 0 {
Expand All @@ -49,19 +51,15 @@ func (vp *validatorsProvider) updateAuctionListCache() error {
return err
}

vp.auctionMutex.Lock()
vp.cachedRandomness = rootHash
vp.auctionMutex.Unlock()

newCache, err := vp.createValidatorsAuctionCache(validatorsMap)
if err != nil {
return err
}

vp.auctionMutex.Lock()
vp.lastAuctionCacheUpdate = time.Now()
vp.cachedAuctionValidators = newCache
vp.auctionMutex.Unlock()

return nil
}
Expand Down Expand Up @@ -96,10 +94,9 @@ func (vp *validatorsProvider) fillAllValidatorsInfo(validatorsMap state.ShardVal
return err
}

// this func should be called under mutex protection
func (vp *validatorsProvider) getSelectedNodesFromAuction(validatorsMap state.ShardValidatorsInfoMapHandler) ([]state.ValidatorInfoHandler, error) {
vp.auctionMutex.RLock()
randomness := vp.cachedRandomness
vp.auctionMutex.RUnlock()

err := vp.auctionListSelector.SelectNodesFromAuctionList(validatorsMap, randomness)
if err != nil {
Expand Down
83 changes: 83 additions & 0 deletions process/peer/validatorsProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,89 @@ func TestValidatorsProvider_GetAuctionList(t *testing.T) {
require.Equal(t, expectedList, list)
})

t.Run("concurrent calls should only update cache once", func(t *testing.T) {
t.Parallel()

args := createDefaultValidatorsProviderArg()

args.CacheRefreshIntervalDurationInSec = time.Second * 5

expectedRootHash := []byte("root hash")
ctRootHashCalled := uint32(0)
ctSelectNodesFromAuctionList := uint32(0)
ctFillValidatorInfoCalled := uint32(0)
ctGetOwnersDataCalled := uint32(0)
ctComputeUnqualifiedNodes := uint32(0)

args.ValidatorStatistics = &testscommon.ValidatorStatisticsProcessorStub{
LastFinalizedRootHashCalled: func() []byte {
atomic.AddUint32(&ctRootHashCalled, 1)
return expectedRootHash
},
GetValidatorInfoForRootHashCalled: func(rootHash []byte) (state.ShardValidatorsInfoMapHandler, error) {
require.Equal(t, expectedRootHash, rootHash)
return state.NewShardValidatorsInfoMap(), nil
},
}
args.AuctionListSelector = &stakingcommon.AuctionListSelectorStub{
SelectNodesFromAuctionListCalled: func(validatorsInfoMap state.ShardValidatorsInfoMapHandler, randomness []byte) error {
atomic.AddUint32(&ctSelectNodesFromAuctionList, 1)
require.Equal(t, expectedRootHash, randomness)
return nil
},
}
args.StakingDataProvider = &stakingcommon.StakingDataProviderStub{
FillValidatorInfoCalled: func(validator state.ValidatorInfoHandler) error {
atomic.AddUint32(&ctFillValidatorInfoCalled, 1)
return nil
},
GetOwnersDataCalled: func() map[string]*epochStart.OwnerData {
atomic.AddUint32(&ctGetOwnersDataCalled, 1)
return nil
},
ComputeUnQualifiedNodesCalled: func(validatorInfos state.ShardValidatorsInfoMapHandler) ([][]byte, map[string][][]byte, error) {
atomic.AddUint32(&ctComputeUnqualifiedNodes, 1)
return nil, nil, nil
},
}
vp, _ := NewValidatorsProvider(args)
time.Sleep(args.CacheRefreshIntervalDurationInSec)

numCalls := 99
wg := sync.WaitGroup{}
wg.Add(numCalls)

for i := 0; i < numCalls; i++ {
go func(idx int) {
switch idx % 3 {
case 0:
list, err := vp.GetAuctionList()
require.NoError(t, err)
require.Empty(t, list)
case 1:
err := vp.ForceUpdate()
require.NoError(t, err)
case 2:
_ = vp.GetLatestValidators()
}

wg.Done()
}(i)
}

wg.Wait()

// expectedMaxNumCalls is:
// - 1 from constructor
// - 1 from GetAuctionList, should not update second time
// - 1 from GetLatestValidators, should not update second time
// - 33 calls * 2 from ForceUpdate, calling it twice/call
expectedMaxNumCalls := uint32(1 + 1 + 1 + 66)
require.LessOrEqual(t, ctRootHashCalled, expectedMaxNumCalls)

require.NoError(t, vp.Close())
})

}

func createMockValidatorInfo() *state.ValidatorInfo {
Expand Down
10 changes: 9 additions & 1 deletion vm/process/systemVM.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
vmcommon "github.com/multiversx/mx-chain-vm-common-go"

"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/vm"
vmcommon "github.com/multiversx/mx-chain-vm-common-go"
)

type systemVM struct {
Expand All @@ -18,6 +19,7 @@ type systemVM struct {
asyncCallbackGasLock uint64
asyncCallStepCost uint64
mutGasLock sync.RWMutex
criticalSection sync.Mutex
}

// ArgsNewSystemVM defines the needed arguments to create a new system vm
Expand Down Expand Up @@ -68,6 +70,9 @@ func NewSystemVM(args ArgsNewSystemVM) (*systemVM, error) {

// RunSmartContractCreate creates and saves a new smart contract to the trie
func (s *systemVM) RunSmartContractCreate(input *vmcommon.ContractCreateInput) (*vmcommon.VMOutput, error) {
s.criticalSection.Lock()
defer s.criticalSection.Unlock()

if input == nil {
return nil, vm.ErrInputArgsIsNil
}
Expand Down Expand Up @@ -101,6 +106,9 @@ func (s *systemVM) RunSmartContractCreate(input *vmcommon.ContractCreateInput) (

// RunSmartContractCall executes a smart contract according to the input
func (s *systemVM) RunSmartContractCall(input *vmcommon.ContractCallInput) (*vmcommon.VMOutput, error) {
s.criticalSection.Lock()
defer s.criticalSection.Unlock()

s.systemEI.CleanCache()
s.systemEI.SetSCAddress(input.RecipientAddr)
s.systemEI.AddTxValueToSmartContract(input.CallValue, input.RecipientAddr)
Expand Down

0 comments on commit 2a6916f

Please sign in to comment.