diff --git a/cmd/proxy/config/config.toml b/cmd/proxy/config/config.toml index 9cf47be2..3072af75 100644 --- a/cmd/proxy/config/config.toml +++ b/cmd/proxy/config/config.toml @@ -39,6 +39,9 @@ # With this flag disabled, /transaction/pool route will return an error AllowEntireTxPoolFetch = false + # NumberOfShards represents the total number of shards from the network (excluding metachain) + NumberOfShards = 3 + [AddressPubkeyConverter] #Length specifies the length in bytes of an address Length = 32 diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 2e1b1af2..a51261dd 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -328,6 +328,7 @@ func createVersionsRegistryTestOrProduction( ValStatsCacheValidityDurationSec: 60, EconomicsMetricsCacheValidityDurationSec: 6, FaucetValue: "10000000000", + NumberOfShards: 2, }, ApiLogging: config.ApiLoggingConfig{ LoggingEnabled: true, @@ -408,7 +409,7 @@ func createVersionsRegistry( return nil, err } - shardCoord, err := getShardCoordinator(cfg) + shardCoord, err := sharding.NewMultiShardCoordinator(uint32(cfg.GeneralSettings.NumberOfShards)+1, 0) if err != nil { return nil, err } @@ -556,24 +557,6 @@ func createVersionsRegistry( return versionsFactory.CreateVersionsRegistry(facadeArgs, apiConfigParser) } -func getShardCoordinator(cfg *config.Config) (common.Coordinator, error) { - maxShardID := uint32(0) - for _, obs := range cfg.Observers { - shardID := obs.ShardId - isMetaChain := shardID == core.MetachainShardId - if maxShardID < shardID && !isMetaChain { - maxShardID = shardID - } - } - - shardCoordinator, err := sharding.NewMultiShardCoordinator(maxShardID+1, 0) - if err != nil { - return nil, err - } - - return shardCoordinator, nil -} - func startWebServer( versionsRegistry data.VersionsRegistryHandler, generalConfig *config.Config, diff --git a/config/config.go b/config/config.go index 5cd581b4..2357dc72 100644 --- a/config/config.go +++ b/config/config.go @@ -16,6 +16,7 @@ type GeneralSettingsConfig struct { BalancedObservers bool BalancedFullHistoryNodes bool AllowEntireTxPoolFetch bool + NumberOfShards uint32 } // Config will hold the whole config file's data diff --git a/observer/baseNodeProvider.go b/observer/baseNodeProvider.go index 8395750f..00210b6a 100644 --- a/observer/baseNodeProvider.go +++ b/observer/baseNodeProvider.go @@ -14,6 +14,7 @@ import ( type baseNodeProvider struct { mutNodes sync.RWMutex shardIds []uint32 + numOfShards uint32 configurationFilePath string regularNodes NodesHolder snapshotlessNodes NodesHolder @@ -28,6 +29,19 @@ func (bnp *baseNodeProvider) initNodes(nodes []*data.NodeData) error { for _, observer := range nodes { shardId := observer.ShardId newNodes[shardId] = append(newNodes[shardId], observer) + isMeta := shardId == core.MetachainShardId + if isMeta { + continue + } + + if shardId > bnp.numOfShards { + return fmt.Errorf("%w for observer %s, provided shard %d, number of shards configured %d", + ErrInvalidShard, + observer.Address, + observer.ShardId, + bnp.numOfShards, + ) + } } err := checkNodesInShards(newNodes) @@ -116,10 +130,6 @@ func splitNodesByDataAvailability(nodes []*data.NodeData) ([]*data.NodeData, []* // ReloadNodes will reload the observers or the full history observers func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesReloadResponse { - bnp.mutNodes.RLock() - numOldShardsCount := len(bnp.shardIds) - bnp.mutNodes.RUnlock() - newConfig, err := loadMainConfig(bnp.configurationFilePath) if err != nil { return data.NodesReloadResponse{ @@ -129,21 +139,22 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo } } + numOldShards := bnp.numOfShards + numNewShards := newConfig.GeneralSettings.NumberOfShards + if numOldShards != numNewShards { + return data.NodesReloadResponse{ + OkRequest: false, + Description: "not reloaded", + Error: fmt.Sprintf("different number of shards. before: %d, now: %d", numOldShards, numNewShards), + } + } + nodes := newConfig.Observers if nodesType == data.FullHistoryNode { nodes = newConfig.FullHistoryNodes } newNodes := nodesSliceToShardedMap(nodes) - numNewShardsCount := len(newNodes) - - if numOldShardsCount != numNewShardsCount { - return data.NodesReloadResponse{ - OkRequest: false, - Description: "not reloaded", - Error: fmt.Sprintf("different number of shards. before: %d, now: %d", numOldShardsCount, numNewShardsCount), - } - } bnp.mutNodes.Lock() defer bnp.mutNodes.Unlock() diff --git a/observer/baseNodeProvider_test.go b/observer/baseNodeProvider_test.go index 847e3ed8..9ac5cfef 100644 --- a/observer/baseNodeProvider_test.go +++ b/observer/baseNodeProvider_test.go @@ -1,6 +1,8 @@ package observer import ( + "errors" + "strings" "testing" "github.com/multiversx/mx-chain-core-go/core" @@ -40,15 +42,42 @@ func TestBaseNodeProvider_InvalidNodesConfiguration(t *testing.T) { }, } - bnp := baseNodeProvider{} + bnp := baseNodeProvider{ + numOfShards: 1, + } err := bnp.initNodes(nodes) require.Contains(t, err.Error(), "observers for shard 1 must include at least one historical (non-snapshotless) observer") } +func TestBaseNodeProvider_InvalidShardForObserver(t *testing.T) { + t.Parallel() + + nodes := []*data.NodeData{ + { + Address: "addr0", + ShardId: 0, + IsSnapshotless: false, + }, + { + Address: "addr1", + ShardId: 2, + IsSnapshotless: true, + }, + } + + bnp := baseNodeProvider{ + numOfShards: 1, + } + err := bnp.initNodes(nodes) + require.True(t, errors.Is(err, ErrInvalidShard)) + require.True(t, strings.Contains(err.Error(), "addr1")) +} + func TestBaseNodeProvider_ReloadNodesDifferentNumberOfNewShard(t *testing.T) { bnp := &baseNodeProvider{ configurationFilePath: configurationPath, shardIds: []uint32{0, 1}, + numOfShards: 2, } response := bnp.ReloadNodes(data.Observer) @@ -66,14 +95,34 @@ func TestBaseNodeProvider_ReloadNodesConfigurationFileNotFound(t *testing.T) { } func TestBaseNodeProvider_ReloadNodesShouldWork(t *testing.T) { - bnp := &baseNodeProvider{ - configurationFilePath: configurationPath, - shardIds: []uint32{0, 1, core.MetachainShardId}, - } + t.Parallel() - response := bnp.ReloadNodes(data.Observer) - require.True(t, response.OkRequest) - require.Empty(t, response.Error) + t.Run("same number of observer shards provided", func(t *testing.T) { + t.Parallel() + + bnp := &baseNodeProvider{ + configurationFilePath: configurationPath, + shardIds: []uint32{0, 1, core.MetachainShardId}, + numOfShards: 3, + } + + response := bnp.ReloadNodes(data.Observer) + require.True(t, response.OkRequest) + require.Empty(t, response.Error) + }) + t.Run("more observer shards provided", func(t *testing.T) { + t.Parallel() + + bnp := &baseNodeProvider{ + configurationFilePath: configurationPath, + shardIds: []uint32{0, 1, core.MetachainShardId}, // no observer for shard 2, will come after reload + numOfShards: 3, // same as in configurationPath + } + + response := bnp.ReloadNodes(data.Observer) + require.True(t, response.OkRequest) + require.Empty(t, response.Error) + }) } func TestBaseNodeProvider_prepareReloadResponseMessage(t *testing.T) { diff --git a/observer/circularQueueNodesProvider.go b/observer/circularQueueNodesProvider.go index 9236e6c8..124542d3 100644 --- a/observer/circularQueueNodesProvider.go +++ b/observer/circularQueueNodesProvider.go @@ -13,9 +13,14 @@ type circularQueueNodesProvider struct { } // NewCircularQueueNodesProvider returns a new instance of circularQueueNodesProvider -func NewCircularQueueNodesProvider(observers []*data.NodeData, configurationFilePath string) (*circularQueueNodesProvider, error) { +func NewCircularQueueNodesProvider( + observers []*data.NodeData, + configurationFilePath string, + numberOfShards uint32, +) (*circularQueueNodesProvider, error) { bop := &baseNodeProvider{ configurationFilePath: configurationFilePath, + numOfShards: numberOfShards, } err := bop.initNodes(observers) diff --git a/observer/circularQueueNodesProvider_test.go b/observer/circularQueueNodesProvider_test.go index bd195f69..a24aadc8 100644 --- a/observer/circularQueueNodesProvider_test.go +++ b/observer/circularQueueNodesProvider_test.go @@ -23,6 +23,9 @@ func getDummyConfig() config.Config { ShardId: 1, }, }, + GeneralSettings: config.GeneralSettingsConfig{ + NumberOfShards: 2, + }, } } @@ -31,7 +34,7 @@ func TestNewCircularQueueObserverProvider_EmptyObserversListShouldErr(t *testing cfg := getDummyConfig() cfg.Observers = make([]*data.NodeData, 0) - cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path") + cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) assert.Nil(t, cqop) assert.Equal(t, ErrEmptyObserversList, err) } @@ -40,7 +43,7 @@ func TestNewCircularQueueObserverProvider_ShouldWork(t *testing.T) { t.Parallel() cfg := getDummyConfig() - cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path") + cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) assert.Nil(t, err) assert.False(t, check.IfNil(cqop)) } @@ -50,7 +53,7 @@ func TestCircularQueueObserversProvider_GetObserversByShardIdShouldWork(t *testi shardId := uint32(0) cfg := getDummyConfig() - cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path") + cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) res, err := cqop.GetNodesByShardId(shardId, data.AvailabilityAll) assert.Nil(t, err) @@ -77,7 +80,7 @@ func TestCircularQueueObserversProvider_GetObserversByShardIdShouldBalanceObserv }, }, } - cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path") + cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) res1, _ := cqop.GetNodesByShardId(shardId, data.AvailabilityAll) res2, _ := cqop.GetNodesByShardId(shardId, data.AvailabilityAll) @@ -94,7 +97,7 @@ func TestCircularQueueObserversProvider_GetAllObserversShouldWork(t *testing.T) t.Parallel() cfg := getDummyConfig() - cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path") + cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) res, err := cqop.GetAllNodes(data.AvailabilityAll) assert.NoError(t, err) @@ -120,7 +123,7 @@ func TestCircularQueueObserversProvider_GetAllObserversShouldWorkAndBalanceObser }, }, } - cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path") + cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) res1, _ := cqop.GetAllNodes(data.AvailabilityAll) res2, _ := cqop.GetAllNodes(data.AvailabilityAll) @@ -166,7 +169,7 @@ func TestCircularQueueObserversProvider_GetAllObservers_ConcurrentSafe(t *testin expectedNumOfTimesAnObserverIsCalled := (numOfTimesToCallForEachRoutine * numOfGoRoutinesToStart) / len(observers) - cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path") + cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) for i := 0; i < numOfGoRoutinesToStart; i++ { for j := 0; j < numOfTimesToCallForEachRoutine; j++ { @@ -225,7 +228,7 @@ func TestCircularQueueObserversProvider_GetObserversByShardId_ConcurrentSafe(t * expectedNumOfTimesAnObserverIsCalled := 2 * ((numOfTimesToCallForEachRoutine * numOfGoRoutinesToStart) / len(observers)) - cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path") + cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) for i := 0; i < numOfGoRoutinesToStart; i++ { for j := 0; j < numOfTimesToCallForEachRoutine; j++ { diff --git a/observer/errors.go b/observer/errors.go index c4529012..d9af62a4 100644 --- a/observer/errors.go +++ b/observer/errors.go @@ -7,3 +7,6 @@ var ErrEmptyObserversList = errors.New("empty observers list") // ErrShardNotAvailable signals that the specified shard ID cannot be found in internal maps var ErrShardNotAvailable = errors.New("the specified shard ID does not exist in proxy's configuration") + +// ErrInvalidShard signals that an invalid shard has been provided +var ErrInvalidShard = errors.New("invalid shard") diff --git a/observer/nodesProviderFactory.go b/observer/nodesProviderFactory.go index c860ebe6..83153d7c 100644 --- a/observer/nodesProviderFactory.go +++ b/observer/nodesProviderFactory.go @@ -24,16 +24,25 @@ func NewNodesProviderFactory(cfg config.Config, configurationFilePath string) (* // CreateObservers will create and return an object of type NodesProviderHandler based on a flag func (npf *nodesProviderFactory) CreateObservers() (NodesProviderHandler, error) { if npf.cfg.GeneralSettings.BalancedObservers { - return NewCircularQueueNodesProvider(npf.cfg.Observers, npf.configurationFilePath) + return NewCircularQueueNodesProvider( + npf.cfg.Observers, + npf.configurationFilePath, + npf.cfg.GeneralSettings.NumberOfShards) } - return NewSimpleNodesProvider(npf.cfg.Observers, npf.configurationFilePath) + return NewSimpleNodesProvider( + npf.cfg.Observers, + npf.configurationFilePath, + npf.cfg.GeneralSettings.NumberOfShards) } // CreateFullHistoryNodes will create and return an object of type NodesProviderHandler based on a flag func (npf *nodesProviderFactory) CreateFullHistoryNodes() (NodesProviderHandler, error) { if npf.cfg.GeneralSettings.BalancedFullHistoryNodes { - nodesProviderHandler, err := NewCircularQueueNodesProvider(npf.cfg.FullHistoryNodes, npf.configurationFilePath) + nodesProviderHandler, err := NewCircularQueueNodesProvider( + npf.cfg.FullHistoryNodes, + npf.configurationFilePath, + npf.cfg.GeneralSettings.NumberOfShards) if err != nil { return getDisabledFullHistoryNodesProviderIfNeeded(err) } @@ -41,7 +50,10 @@ func (npf *nodesProviderFactory) CreateFullHistoryNodes() (NodesProviderHandler, return nodesProviderHandler, nil } - nodesProviderHandler, err := NewSimpleNodesProvider(npf.cfg.FullHistoryNodes, npf.configurationFilePath) + nodesProviderHandler, err := NewSimpleNodesProvider( + npf.cfg.FullHistoryNodes, + npf.configurationFilePath, + npf.cfg.GeneralSettings.NumberOfShards) if err != nil { return getDisabledFullHistoryNodesProviderIfNeeded(err) } diff --git a/observer/simpleNodesProvider.go b/observer/simpleNodesProvider.go index e3e095a4..d1667ce0 100644 --- a/observer/simpleNodesProvider.go +++ b/observer/simpleNodesProvider.go @@ -11,9 +11,14 @@ type simpleNodesProvider struct { } // NewSimpleNodesProvider will return a new instance of simpleNodesProvider -func NewSimpleNodesProvider(observers []*data.NodeData, configurationFilePath string) (*simpleNodesProvider, error) { +func NewSimpleNodesProvider( + observers []*data.NodeData, + configurationFilePath string, + numberOfShards uint32, +) (*simpleNodesProvider, error) { bop := &baseNodeProvider{ configurationFilePath: configurationFilePath, + numOfShards: numberOfShards, } err := bop.initNodes(observers) diff --git a/observer/simpleNodesProvider_test.go b/observer/simpleNodesProvider_test.go index 633cf373..39b8d6d0 100644 --- a/observer/simpleNodesProvider_test.go +++ b/observer/simpleNodesProvider_test.go @@ -16,7 +16,7 @@ func TestNewSimpleObserversProvider_EmptyObserversListShouldErr(t *testing.T) { cfg := getDummyConfig() cfg.Observers = make([]*data.NodeData, 0) - sop, err := NewSimpleNodesProvider(cfg.Observers, "path") + sop, err := NewSimpleNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) assert.Nil(t, sop) assert.Equal(t, ErrEmptyObserversList, err) } @@ -25,7 +25,7 @@ func TestNewSimpleObserversProvider_ShouldWork(t *testing.T) { t.Parallel() cfg := getDummyConfig() - sop, err := NewSimpleNodesProvider(cfg.Observers, "path") + sop, err := NewSimpleNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) assert.Nil(t, err) assert.False(t, check.IfNil(sop)) } @@ -35,7 +35,7 @@ func TestSimpleObserversProvider_GetObserversByShardIdShouldErrBecauseInvalidSha invalidShardId := uint32(37) cfg := getDummyConfig() - cqop, _ := NewSimpleNodesProvider(cfg.Observers, "path") + cqop, _ := NewSimpleNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) res, err := cqop.GetNodesByShardId(invalidShardId, "") assert.Nil(t, res) @@ -47,7 +47,7 @@ func TestSimpleObserversProvider_GetObserversByShardIdShouldWork(t *testing.T) { shardId := uint32(0) cfg := getDummyConfig() - cqop, _ := NewSimpleNodesProvider(cfg.Observers, "path") + cqop, _ := NewSimpleNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) res, err := cqop.GetNodesByShardId(shardId, "") assert.Nil(t, err) @@ -58,7 +58,7 @@ func TestSimpleObserversProvider_GetAllObserversShouldWork(t *testing.T) { t.Parallel() cfg := getDummyConfig() - cqop, _ := NewSimpleNodesProvider(cfg.Observers, "path") + cqop, _ := NewSimpleNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) res, _ := cqop.GetAllNodes("") assert.Equal(t, 2, len(res)) @@ -105,7 +105,7 @@ func TestSimpleObserversProvider_GetObserversByShardId_ConcurrentSafe(t *testing // will be called expectedNumOfTimesAnObserverIsCalled := numOfTimesToCallForEachRoutine * numOfGoRoutinesToStart - sop, _ := NewSimpleNodesProvider(cfg.Observers, "path") + sop, _ := NewSimpleNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) for i := 0; i < numOfGoRoutinesToStart; i++ { for j := 0; j < numOfTimesToCallForEachRoutine; j++ { @@ -168,7 +168,7 @@ func TestSimpleObserversProvider_GetAllObservers_ConcurrentSafe(t *testing.T) { // will be called expectedNumOfTimesAnObserverIsCalled := numOfTimesToCallForEachRoutine * numOfGoRoutinesToStart - sop, _ := NewSimpleNodesProvider(cfg.Observers, "path") + sop, _ := NewSimpleNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers))) for i := 0; i < numOfGoRoutinesToStart; i++ { for j := 0; j < numOfTimesToCallForEachRoutine; j++ { diff --git a/observer/testdata/config.toml b/observer/testdata/config.toml index 89ccdd9f..c82a7ee4 100644 --- a/observer/testdata/config.toml +++ b/observer/testdata/config.toml @@ -1,25 +1,34 @@ +# GeneralSettings section of the proxy server +[GeneralSettings] + # NumberOfShards represents the total number of shards from the network (excluding metachain) + NumberOfShards = 3 + # List of Observers. If you want to define a metachain observer (needed for validator statistics route) use # shard id 4294967295 [[Observers]] -ShardId = 0 -Address = "observer-shard-0" + ShardId = 0 + Address = "observer-shard-0" + +[[Observers]] + ShardId = 1 + Address = "observer-shard-1" [[Observers]] -ShardId = 1 -Address = "observer-shard-1" + ShardId = 2 + Address = "observer-shard-2" [[Observers]] -ShardId = 4294967295 -Address = "observer-shard-4294967295" + ShardId = 4294967295 + Address = "observer-shard-4294967295" [[FullHistoryNodes]] -ShardId = 0 -Address = "full-history-observer-shard-0" + ShardId = 0 + Address = "full-history-observer-shard-0" [[FullHistoryNodes]] -ShardId = 1 -Address = "full-history-observer-shard-1" + ShardId = 1 + Address = "full-history-observer-shard-1" [[FullHistoryNodes]] -ShardId = 1 -Address = "full-history-observer-shard-4294967295" + ShardId = 1 + Address = "full-history-observer-shard-4294967295"