Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map counters holder component #394

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions observer/availabilityCommon/availabilityProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,8 @@ func (ap *AvailabilityProvider) GetDescriptionForAvailability(availability data.
return "N/A"
}
}

// GetAllAvailabilityTypes returns all data availability types
func (ap *AvailabilityProvider) GetAllAvailabilityTypes() []data.ObserverDataAvailabilityType {
return []data.ObserverDataAvailabilityType{data.AvailabilityAll, data.AvailabilityRecent}
}
15 changes: 15 additions & 0 deletions observer/availabilityCommon/availabilityProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
)

func TestAvailabilityForAccountQueryOptions(t *testing.T) {
t.Parallel()

ap := &AvailabilityProvider{}

// Test with historical coordinates set
Expand All @@ -22,6 +24,8 @@ func TestAvailabilityForAccountQueryOptions(t *testing.T) {
}

func TestAvailabilityForVmQuery(t *testing.T) {
t.Parallel()

ap := &AvailabilityProvider{}

// Test with BlockNonce set
Expand All @@ -38,6 +42,8 @@ func TestAvailabilityForVmQuery(t *testing.T) {
}

func TestIsNodeValid(t *testing.T) {
t.Parallel()

ap := &AvailabilityProvider{}

// Test with AvailabilityRecent and snapshotless node
Expand All @@ -58,9 +64,18 @@ func TestIsNodeValid(t *testing.T) {
}

func TestGetDescriptionForAvailability(t *testing.T) {
t.Parallel()

ap := &AvailabilityProvider{}

require.Equal(t, "regular nodes", ap.GetDescriptionForAvailability(data.AvailabilityAll))
require.Equal(t, "snapshotless nodes", ap.GetDescriptionForAvailability(data.AvailabilityRecent))
require.Equal(t, "N/A", ap.GetDescriptionForAvailability("invalid")) // Invalid value
}

func TestAvailabilityProvider_GetAllAvailabilityTypes(t *testing.T) {
t.Parallel()

ap := &AvailabilityProvider{}
require.Equal(t, []data.ObserverDataAvailabilityType{data.AvailabilityAll, data.AvailabilityRecent}, ap.GetAllAvailabilityTypes())
}
45 changes: 14 additions & 31 deletions observer/circularQueueNodesProvider.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package observer

import (
"sync"

"github.com/multiversx/mx-chain-proxy-go/data"
"github.com/multiversx/mx-chain-proxy-go/observer/mapCounters"
)

// circularQueueNodesProvider will handle the providing of observers in a circular queue way, guaranteeing the
// balancing of them
type circularQueueNodesProvider struct {
*baseNodeProvider
countersMap map[uint32]uint32
counterForAllNodes uint32
mutCounters sync.RWMutex
positionsHolder CounterMapsHolder
}

// NewCircularQueueNodesProvider returns a new instance of circularQueueNodesProvider
Expand All @@ -26,11 +23,9 @@ func NewCircularQueueNodesProvider(observers []*data.NodeData, configurationFile
return nil, err
}

countersMap := make(map[uint32]uint32)
return &circularQueueNodesProvider{
baseNodeProvider: bop,
countersMap: countersMap,
counterForAllNodes: 0,
baseNodeProvider: bop,
positionsHolder: mapCounters.NewMapCountersHolder(),
}, nil
}

Expand All @@ -44,7 +39,11 @@ func (cqnp *circularQueueNodesProvider) GetNodesByShardId(shardId uint32, dataAv
return nil, err
}

position := cqnp.computeCounterForShard(shardId, uint32(len(syncedNodesForShard)))
position, err := cqnp.positionsHolder.ComputeShardPosition(dataAvailability, shardId, uint32(len(syncedNodesForShard)))
if err != nil {
return nil, err
}

sliceToRet := append(syncedNodesForShard[position:], syncedNodesForShard[:position]...)

return sliceToRet, nil
Expand All @@ -60,32 +59,16 @@ func (cqnp *circularQueueNodesProvider) GetAllNodes(dataAvailability data.Observ
return nil, err
}

position := cqnp.computeCounterForAllNodes(uint32(len(allNodes)))
position, err := cqnp.positionsHolder.ComputeAllNodesPosition(dataAvailability, uint32(len(allNodes)))
if err != nil {
return nil, err
}

sliceToRet := append(allNodes[position:], allNodes[:position]...)

return sliceToRet, nil
}

func (cqnp *circularQueueNodesProvider) computeCounterForShard(shardID uint32, lenNodes uint32) uint32 {
cqnp.mutCounters.Lock()
defer cqnp.mutCounters.Unlock()

cqnp.countersMap[shardID]++
cqnp.countersMap[shardID] %= lenNodes

return cqnp.countersMap[shardID]
}

func (cqnp *circularQueueNodesProvider) computeCounterForAllNodes(lenNodes uint32) uint32 {
cqnp.mutCounters.Lock()
defer cqnp.mutCounters.Unlock()

cqnp.counterForAllNodes++
cqnp.counterForAllNodes %= lenNodes

return cqnp.counterForAllNodes
}

// IsInterfaceNil returns true if there is no value under the interface
func (cqnp *circularQueueNodesProvider) IsInterfaceNil() bool {
return cqnp == nil
Expand Down
26 changes: 13 additions & 13 deletions observer/circularQueueNodesProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestCircularQueueObserversProvider_GetObserversByShardIdShouldWork(t *testi
cfg := getDummyConfig()
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")

res, err := cqop.GetNodesByShardId(shardId, "")
res, err := cqop.GetNodesByShardId(shardId, data.AvailabilityAll)
assert.Nil(t, err)
assert.Equal(t, 1, len(res))
}
Expand All @@ -79,14 +79,14 @@ func TestCircularQueueObserversProvider_GetObserversByShardIdShouldBalanceObserv
}
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")

res1, _ := cqop.GetNodesByShardId(shardId, "")
res2, _ := cqop.GetNodesByShardId(shardId, "")
res1, _ := cqop.GetNodesByShardId(shardId, data.AvailabilityAll)
res2, _ := cqop.GetNodesByShardId(shardId, data.AvailabilityAll)
assert.NotEqual(t, res1, res2)

// there are 3 observers. so after 3 steps, the queue should be the same as the original
_, _ = cqop.GetNodesByShardId(shardId, "")
_, _ = cqop.GetNodesByShardId(shardId, data.AvailabilityAll)

res4, _ := cqop.GetNodesByShardId(shardId, "")
res4, _ := cqop.GetNodesByShardId(shardId, data.AvailabilityAll)
assert.Equal(t, res1, res4)
}

Expand All @@ -96,7 +96,7 @@ func TestCircularQueueObserversProvider_GetAllObserversShouldWork(t *testing.T)
cfg := getDummyConfig()
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")

res, err := cqop.GetAllNodes("")
res, err := cqop.GetAllNodes(data.AvailabilityAll)
assert.NoError(t, err)
assert.Equal(t, 2, len(res))
}
Expand All @@ -122,14 +122,14 @@ func TestCircularQueueObserversProvider_GetAllObserversShouldWorkAndBalanceObser
}
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")

res1, _ := cqop.GetAllNodes("")
res2, _ := cqop.GetAllNodes("")
res1, _ := cqop.GetAllNodes(data.AvailabilityAll)
res2, _ := cqop.GetAllNodes(data.AvailabilityAll)
assert.NotEqual(t, res1, res2)

// there are 3 observers. so after 3 steps, the queue should be the same as the original
_, _ = cqop.GetAllNodes("")
_, _ = cqop.GetAllNodes(data.AvailabilityAll)

res4, _ := cqop.GetAllNodes("")
res4, _ := cqop.GetAllNodes(data.AvailabilityAll)
assert.Equal(t, res1, res4)
}

Expand Down Expand Up @@ -172,7 +172,7 @@ func TestCircularQueueObserversProvider_GetAllObservers_ConcurrentSafe(t *testin
for i := 0; i < numOfGoRoutinesToStart; i++ {
for j := 0; j < numOfTimesToCallForEachRoutine; j++ {
go func(mutMap *sync.RWMutex, mapCalledObs map[string]int) {
obs, _ := cqop.GetAllNodes("")
obs, _ := cqop.GetAllNodes(data.AvailabilityAll)
mutMap.Lock()
mapCalledObs[obs[0].Address]++
mutMap.Unlock()
Expand Down Expand Up @@ -232,8 +232,8 @@ func TestCircularQueueObserversProvider_GetObserversByShardId_ConcurrentSafe(t *
for i := 0; i < numOfGoRoutinesToStart; i++ {
for j := 0; j < numOfTimesToCallForEachRoutine; j++ {
go func(mutMap *sync.RWMutex, mapCalledObs map[string]int) {
obsSh0, _ := cqop.GetNodesByShardId(shardId0, "")
obsSh1, _ := cqop.GetNodesByShardId(shardId1, "")
obsSh0, _ := cqop.GetNodesByShardId(shardId0, data.AvailabilityAll)
obsSh1, _ := cqop.GetNodesByShardId(shardId1, data.AvailabilityAll)
mutMap.Lock()
mapCalledObs[obsSh0[0].Address]++
mapCalledObs[obsSh1[0].Address]++
Expand Down
7 changes: 7 additions & 0 deletions observer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ type NodesHolder interface {
Count() int
IsInterfaceNil() bool
}

// CounterMapsHolder defines the actions to be implemented by a component that can hold multiple counter maps
type CounterMapsHolder interface {
ComputeShardPosition(availability data.ObserverDataAvailabilityType, shardID uint32, numNodes uint32) (uint32, error)
ComputeAllNodesPosition(availability data.ObserverDataAvailabilityType, numNodes uint32) (uint32, error)
IsInterfaceNil() bool
}
56 changes: 56 additions & 0 deletions observer/mapCounters/mapCounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package mapCounters

import "sync"

type mapCounter struct {
positions map[uint32]uint32
allNodesCount uint32
allNodesPosition uint32
mut sync.RWMutex
}

// newMapCounter returns a new instance of a mapCounter
func newMapCounter() *mapCounter {
return &mapCounter{
positions: make(map[uint32]uint32),
allNodesPosition: 0,
}
}

func (mc *mapCounter) computePositionForShard(shardID uint32, numNodes uint32) uint32 {
mc.mut.Lock()
defer mc.mut.Unlock()

mc.initShardPositionIfNeededUnprotected(shardID)

mc.positions[shardID]++
mc.positions[shardID] %= numNodes

return mc.positions[shardID]
}

func (mc *mapCounter) computePositionForAllNodes(numNodes uint32) uint32 {
mc.mut.Lock()
defer mc.mut.Unlock()

mc.initAllNodesPositionIfNeededUnprotected(numNodes)

mc.allNodesPosition++
mc.allNodesPosition %= numNodes

return mc.allNodesPosition
}

func (mc *mapCounter) initShardPositionIfNeededUnprotected(shardID uint32) {
_, shardExists := mc.positions[shardID]
if !shardExists {
mc.positions[shardID] = 0
}
}

func (mc *mapCounter) initAllNodesPositionIfNeededUnprotected(numNodes uint32) {
if numNodes != mc.allNodesCount {
mc.allNodesCount = numNodes
mc.allNodesPosition = 0
}
}
107 changes: 107 additions & 0 deletions observer/mapCounters/mapCounter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package mapCounters

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestNewMapCounter(t *testing.T) {
t.Parallel()

mc := newMapCounter()
require.NotNil(t, mc)
require.NotNil(t, mc.positions)
}

func TestMapCounter_ComputeShardPositionShouldWorkWithDifferentNumOfNodes(t *testing.T) {
t.Parallel()

mc := newMapCounter()
computeShardPosAndAssert(t, mc, 3, 1)
computeShardPosAndAssert(t, mc, 3, 2)
computeShardPosAndAssert(t, mc, 3, 0)
computeShardPosAndAssert(t, mc, 3, 1)
// change num nodes
computeShardPosAndAssert(t, mc, 2, 0)
computeShardPosAndAssert(t, mc, 2, 1)
computeShardPosAndAssert(t, mc, 2, 0)
// change num nodes again
computeShardPosAndAssert(t, mc, 5, 1)
computeShardPosAndAssert(t, mc, 5, 2)
computeShardPosAndAssert(t, mc, 5, 3)
}

func TestMapCounter_ComputeShardPositionShouldWorkMultiShard(t *testing.T) {
t.Parallel()

mc := newMapCounter()
computeShardPosAndAssertForShard(t, mc, 0, 3, 1)
computeShardPosAndAssertForShard(t, mc, 1, 4, 1)

computeShardPosAndAssertForShard(t, mc, 0, 3, 2)
computeShardPosAndAssertForShard(t, mc, 1, 4, 2)

computeShardPosAndAssertForShard(t, mc, 0, 3, 0)
computeShardPosAndAssertForShard(t, mc, 1, 4, 3)

computeShardPosAndAssertForShard(t, mc, 0, 3, 1)
computeShardPosAndAssertForShard(t, mc, 1, 4, 0)

}

func computeShardPosAndAssertForShard(t *testing.T, mc *mapCounter, shardID uint32, numNodes uint32, expectedPos uint32) {
actualPos := mc.computePositionForShard(shardID, numNodes)
require.Equal(t, expectedPos, actualPos)
}

func computeShardPosAndAssert(t *testing.T, mc *mapCounter, numNodes uint32, expectedPos uint32) {
computeShardPosAndAssertForShard(t, mc, 0, numNodes, expectedPos)
}

func TestMapCounter_ComputeAllNodesPosition(t *testing.T) {
t.Parallel()

mc := newMapCounter()
computeAllNodesPosAndAssert(t, mc, 3, 1)
computeAllNodesPosAndAssert(t, mc, 3, 2)
computeAllNodesPosAndAssert(t, mc, 3, 0)
computeAllNodesPosAndAssert(t, mc, 3, 1)
// change num nodes - should reset
computeAllNodesPosAndAssert(t, mc, 5, 1)
computeAllNodesPosAndAssert(t, mc, 5, 2)
computeAllNodesPosAndAssert(t, mc, 5, 3)
// change num nodes again - should reset
computeAllNodesPosAndAssert(t, mc, 2, 1)
computeAllNodesPosAndAssert(t, mc, 2, 0)
computeAllNodesPosAndAssert(t, mc, 2, 1)
}

func computeAllNodesPosAndAssert(t *testing.T, mc *mapCounter, numNodes uint32, expectedPos uint32) {
actualPos := mc.computePositionForAllNodes(numNodes)
require.Equal(t, expectedPos, actualPos)
}

func TestMapCounter_ConcurrentOperations(t *testing.T) {
t.Parallel()

mc := newMapCounter()

numOperations := 10_000
wg := sync.WaitGroup{}
wg.Add(numOperations)
for i := 0; i < numOperations; i++ {
go func(idx int) {
switch idx {
case 0:
mc.computePositionForShard(uint32(idx), uint32(10+idx))
case 1:
mc.computePositionForAllNodes(uint32(10 + idx))
}
wg.Done()
}(i % 2)
}

wg.Wait()
}
Loading
Loading