Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable number of shards #460

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/proxy/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
# With this flag disabled, /transaction/pool route will return an error
AllowEntireTxPoolFetch = false

# NumberOfShards represents the total number of shards from the network (excluding metachain)
NumberOfShards = 3

[AddressPubkeyConverter]
#Length specifies the length in bytes of an address
Length = 32
Expand Down
21 changes: 2 additions & 19 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func createVersionsRegistryTestOrProduction(
ValStatsCacheValidityDurationSec: 60,
EconomicsMetricsCacheValidityDurationSec: 6,
FaucetValue: "10000000000",
NumberOfShards: 3,
},
ApiLogging: config.ApiLoggingConfig{
LoggingEnabled: true,
Expand Down Expand Up @@ -408,7 +409,7 @@ func createVersionsRegistry(
return nil, err
}

shardCoord, err := getShardCoordinator(cfg)
shardCoord, err := sharding.NewMultiShardCoordinator(cfg.GeneralSettings.NumberOfShards, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -556,24 +557,6 @@ func createVersionsRegistry(
return versionsFactory.CreateVersionsRegistry(facadeArgs, apiConfigParser)
}

func getShardCoordinator(cfg *config.Config) (common.Coordinator, error) {
maxShardID := uint32(0)
for _, obs := range cfg.Observers {
shardID := obs.ShardId
isMetaChain := shardID == core.MetachainShardId
if maxShardID < shardID && !isMetaChain {
maxShardID = shardID
}
}

shardCoordinator, err := sharding.NewMultiShardCoordinator(maxShardID+1, 0)
if err != nil {
return nil, err
}

return shardCoordinator, nil
}

func startWebServer(
versionsRegistry data.VersionsRegistryHandler,
generalConfig *config.Config,
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type GeneralSettingsConfig struct {
BalancedObservers bool
BalancedFullHistoryNodes bool
AllowEntireTxPoolFetch bool
NumberOfShards uint32
}

// Config will hold the whole config file's data
Expand Down
37 changes: 24 additions & 13 deletions observer/baseNodeProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type baseNodeProvider struct {
mutNodes sync.RWMutex
shardIds []uint32
numOfShards uint32
configurationFilePath string
regularNodes NodesHolder
snapshotlessNodes NodesHolder
Expand All @@ -28,6 +29,19 @@ func (bnp *baseNodeProvider) initNodes(nodes []*data.NodeData) error {
for _, observer := range nodes {
shardId := observer.ShardId
newNodes[shardId] = append(newNodes[shardId], observer)
isMeta := shardId == core.MetachainShardId
if isMeta {
continue
}

if shardId >= bnp.numOfShards {
return fmt.Errorf("%w for observer %s, provided shard %d, number of shards configured %d",
ErrInvalidShard,
observer.Address,
observer.ShardId,
bnp.numOfShards,
)
}
}

err := checkNodesInShards(newNodes)
Expand Down Expand Up @@ -116,10 +130,6 @@ func splitNodesByDataAvailability(nodes []*data.NodeData) ([]*data.NodeData, []*

// ReloadNodes will reload the observers or the full history observers
func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesReloadResponse {
bnp.mutNodes.RLock()
numOldShardsCount := len(bnp.shardIds)
bnp.mutNodes.RUnlock()

newConfig, err := loadMainConfig(bnp.configurationFilePath)
if err != nil {
return data.NodesReloadResponse{
Expand All @@ -129,21 +139,22 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo
}
}

numOldShards := bnp.numOfShards
numNewShards := newConfig.GeneralSettings.NumberOfShards
if numOldShards != numNewShards {
return data.NodesReloadResponse{
OkRequest: false,
Description: "not reloaded",
Error: fmt.Sprintf("different number of shards. before: %d, now: %d", numOldShards, numNewShards),
}
}

nodes := newConfig.Observers
if nodesType == data.FullHistoryNode {
nodes = newConfig.FullHistoryNodes
}

newNodes := nodesSliceToShardedMap(nodes)
numNewShardsCount := len(newNodes)

if numOldShardsCount != numNewShardsCount {
return data.NodesReloadResponse{
OkRequest: false,
Description: "not reloaded",
Error: fmt.Sprintf("different number of shards. before: %d, now: %d", numOldShardsCount, numNewShardsCount),
}
}

bnp.mutNodes.Lock()
defer bnp.mutNodes.Unlock()
Expand Down
65 changes: 57 additions & 8 deletions observer/baseNodeProvider_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package observer

import (
"errors"
"strings"
"testing"

"github.com/multiversx/mx-chain-core-go/core"
Expand Down Expand Up @@ -40,15 +42,42 @@ func TestBaseNodeProvider_InvalidNodesConfiguration(t *testing.T) {
},
}

bnp := baseNodeProvider{}
bnp := baseNodeProvider{
numOfShards: 2,
}
err := bnp.initNodes(nodes)
require.Contains(t, err.Error(), "observers for shard 1 must include at least one historical (non-snapshotless) observer")
}

func TestBaseNodeProvider_InvalidShardForObserver(t *testing.T) {
t.Parallel()

nodes := []*data.NodeData{
{
Address: "addr0",
ShardId: 0,
IsSnapshotless: false,
},
{
Address: "addr1",
ShardId: 1,
IsSnapshotless: true,
},
}

bnp := baseNodeProvider{
numOfShards: 1,
}
err := bnp.initNodes(nodes)
require.True(t, errors.Is(err, ErrInvalidShard))
require.True(t, strings.Contains(err.Error(), "addr1"))
}

func TestBaseNodeProvider_ReloadNodesDifferentNumberOfNewShard(t *testing.T) {
bnp := &baseNodeProvider{
configurationFilePath: configurationPath,
shardIds: []uint32{0, 1},
numOfShards: 2,
}

response := bnp.ReloadNodes(data.Observer)
Expand All @@ -66,14 +95,34 @@ func TestBaseNodeProvider_ReloadNodesConfigurationFileNotFound(t *testing.T) {
}

func TestBaseNodeProvider_ReloadNodesShouldWork(t *testing.T) {
bnp := &baseNodeProvider{
configurationFilePath: configurationPath,
shardIds: []uint32{0, 1, core.MetachainShardId},
}
t.Parallel()

response := bnp.ReloadNodes(data.Observer)
require.True(t, response.OkRequest)
require.Empty(t, response.Error)
t.Run("same number of observer shards provided", func(t *testing.T) {
t.Parallel()

bnp := &baseNodeProvider{
configurationFilePath: configurationPath,
shardIds: []uint32{0, 1, core.MetachainShardId},
numOfShards: 3,
}

response := bnp.ReloadNodes(data.Observer)
require.True(t, response.OkRequest)
require.Empty(t, response.Error)
})
t.Run("more observer shards provided", func(t *testing.T) {
t.Parallel()

bnp := &baseNodeProvider{
configurationFilePath: configurationPath,
shardIds: []uint32{0, 1, core.MetachainShardId}, // no observer for shard 2, will come after reload
numOfShards: 3, // same as in configurationPath
}

response := bnp.ReloadNodes(data.Observer)
require.True(t, response.OkRequest)
require.Empty(t, response.Error)
})
}

func TestBaseNodeProvider_prepareReloadResponseMessage(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion observer/circularQueueNodesProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ type circularQueueNodesProvider struct {
}

// NewCircularQueueNodesProvider returns a new instance of circularQueueNodesProvider
func NewCircularQueueNodesProvider(observers []*data.NodeData, configurationFilePath string) (*circularQueueNodesProvider, error) {
func NewCircularQueueNodesProvider(
observers []*data.NodeData,
configurationFilePath string,
numberOfShards uint32,
) (*circularQueueNodesProvider, error) {
bop := &baseNodeProvider{
configurationFilePath: configurationFilePath,
numOfShards: numberOfShards,
}

err := bop.initNodes(observers)
Expand Down
19 changes: 11 additions & 8 deletions observer/circularQueueNodesProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func getDummyConfig() config.Config {
ShardId: 1,
},
},
GeneralSettings: config.GeneralSettingsConfig{
NumberOfShards: 2,
raduchis marked this conversation as resolved.
Show resolved Hide resolved
},
}
}

Expand All @@ -31,7 +34,7 @@ func TestNewCircularQueueObserverProvider_EmptyObserversListShouldErr(t *testing

cfg := getDummyConfig()
cfg.Observers = make([]*data.NodeData, 0)
cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))
assert.Nil(t, cqop)
assert.Equal(t, ErrEmptyObserversList, err)
}
Expand All @@ -40,7 +43,7 @@ func TestNewCircularQueueObserverProvider_ShouldWork(t *testing.T) {
t.Parallel()

cfg := getDummyConfig()
cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))
assert.Nil(t, err)
assert.False(t, check.IfNil(cqop))
}
Expand All @@ -50,7 +53,7 @@ func TestCircularQueueObserversProvider_GetObserversByShardIdShouldWork(t *testi

shardId := uint32(0)
cfg := getDummyConfig()
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

res, err := cqop.GetNodesByShardId(shardId, data.AvailabilityAll)
assert.Nil(t, err)
Expand All @@ -77,7 +80,7 @@ func TestCircularQueueObserversProvider_GetObserversByShardIdShouldBalanceObserv
},
},
}
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

res1, _ := cqop.GetNodesByShardId(shardId, data.AvailabilityAll)
res2, _ := cqop.GetNodesByShardId(shardId, data.AvailabilityAll)
Expand All @@ -94,7 +97,7 @@ func TestCircularQueueObserversProvider_GetAllObserversShouldWork(t *testing.T)
t.Parallel()

cfg := getDummyConfig()
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

res, err := cqop.GetAllNodes(data.AvailabilityAll)
assert.NoError(t, err)
Expand All @@ -120,7 +123,7 @@ func TestCircularQueueObserversProvider_GetAllObserversShouldWorkAndBalanceObser
},
},
}
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

res1, _ := cqop.GetAllNodes(data.AvailabilityAll)
res2, _ := cqop.GetAllNodes(data.AvailabilityAll)
Expand Down Expand Up @@ -166,7 +169,7 @@ func TestCircularQueueObserversProvider_GetAllObservers_ConcurrentSafe(t *testin

expectedNumOfTimesAnObserverIsCalled := (numOfTimesToCallForEachRoutine * numOfGoRoutinesToStart) / len(observers)

cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

for i := 0; i < numOfGoRoutinesToStart; i++ {
for j := 0; j < numOfTimesToCallForEachRoutine; j++ {
Expand Down Expand Up @@ -225,7 +228,7 @@ func TestCircularQueueObserversProvider_GetObserversByShardId_ConcurrentSafe(t *

expectedNumOfTimesAnObserverIsCalled := 2 * ((numOfTimesToCallForEachRoutine * numOfGoRoutinesToStart) / len(observers))

cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

for i := 0; i < numOfGoRoutinesToStart; i++ {
for j := 0; j < numOfTimesToCallForEachRoutine; j++ {
Expand Down
3 changes: 3 additions & 0 deletions observer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ var ErrEmptyObserversList = errors.New("empty observers list")

// ErrShardNotAvailable signals that the specified shard ID cannot be found in internal maps
var ErrShardNotAvailable = errors.New("the specified shard ID does not exist in proxy's configuration")

// ErrInvalidShard signals that an invalid shard has been provided
var ErrInvalidShard = errors.New("invalid shard")
20 changes: 16 additions & 4 deletions observer/nodesProviderFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,36 @@ func NewNodesProviderFactory(cfg config.Config, configurationFilePath string) (*
// CreateObservers will create and return an object of type NodesProviderHandler based on a flag
func (npf *nodesProviderFactory) CreateObservers() (NodesProviderHandler, error) {
if npf.cfg.GeneralSettings.BalancedObservers {
return NewCircularQueueNodesProvider(npf.cfg.Observers, npf.configurationFilePath)
return NewCircularQueueNodesProvider(
npf.cfg.Observers,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
}

return NewSimpleNodesProvider(npf.cfg.Observers, npf.configurationFilePath)
return NewSimpleNodesProvider(
npf.cfg.Observers,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
}

// CreateFullHistoryNodes will create and return an object of type NodesProviderHandler based on a flag
func (npf *nodesProviderFactory) CreateFullHistoryNodes() (NodesProviderHandler, error) {
if npf.cfg.GeneralSettings.BalancedFullHistoryNodes {
nodesProviderHandler, err := NewCircularQueueNodesProvider(npf.cfg.FullHistoryNodes, npf.configurationFilePath)
nodesProviderHandler, err := NewCircularQueueNodesProvider(
npf.cfg.FullHistoryNodes,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
if err != nil {
return getDisabledFullHistoryNodesProviderIfNeeded(err)
}

return nodesProviderHandler, nil
}

nodesProviderHandler, err := NewSimpleNodesProvider(npf.cfg.FullHistoryNodes, npf.configurationFilePath)
nodesProviderHandler, err := NewSimpleNodesProvider(
npf.cfg.FullHistoryNodes,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
if err != nil {
return getDisabledFullHistoryNodesProviderIfNeeded(err)
}
Expand Down
7 changes: 6 additions & 1 deletion observer/simpleNodesProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ type simpleNodesProvider struct {
}

// NewSimpleNodesProvider will return a new instance of simpleNodesProvider
func NewSimpleNodesProvider(observers []*data.NodeData, configurationFilePath string) (*simpleNodesProvider, error) {
func NewSimpleNodesProvider(
observers []*data.NodeData,
configurationFilePath string,
numberOfShards uint32,
) (*simpleNodesProvider, error) {
bop := &baseNodeProvider{
configurationFilePath: configurationFilePath,
numOfShards: numberOfShards,
}

err := bop.initNodes(observers)
Expand Down
Loading
Loading