Skip to content

Commit

Permalink
Merge pull request #463 from multiversx/get_number_of_shards_from_obs…
Browse files Browse the repository at this point in the history
…ervers

Get number of shards from observers
  • Loading branch information
sstanculeanu authored Oct 3, 2024
2 parents 480651a + 2e35868 commit c170a4c
Show file tree
Hide file tree
Showing 13 changed files with 399 additions and 49 deletions.
15 changes: 9 additions & 6 deletions cmd/proxy/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@
# With this flag disabled, /transaction/pool route will return an error
AllowEntireTxPoolFetch = false

# NumberOfShards represents the total number of shards from the network (excluding metachain)
NumberOfShards = 3
# NumShardsTimeoutInSec represents the maximum number of seconds to wait for at least one observer online until throwing an error
NumShardsTimeoutInSec = 90

# TimeBetweenNodesRequestsInSec represents time to wait before retry to get the number of shards from observers
TimeBetweenNodesRequestsInSec = 2

[AddressPubkeyConverter]
#Length specifies the length in bytes of an address
Length = 32
#Length specifies the length in bytes of an address
Length = 32

# Type specifies the type of public keys: hex or bech32
Type = "bech32"
# Type specifies the type of public keys: hex or bech32
Type = "bech32"

[Marshalizer]
Type = "gogo protobuf"
Expand Down
36 changes: 33 additions & 3 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ func createVersionsRegistryTestOrProduction(
ValStatsCacheValidityDurationSec: 60,
EconomicsMetricsCacheValidityDurationSec: 6,
FaucetValue: "10000000000",
NumberOfShards: 3,
},
ApiLogging: config.ApiLoggingConfig{
LoggingEnabled: true,
Expand Down Expand Up @@ -409,12 +408,12 @@ func createVersionsRegistry(
return nil, err
}

shardCoord, err := sharding.NewMultiShardCoordinator(cfg.GeneralSettings.NumberOfShards, 0)
numShards, err := getNumOfShards(cfg)
if err != nil {
return nil, err
}

nodesProviderFactory, err := observer.NewNodesProviderFactory(*cfg, configurationFilePath)
nodesProviderFactory, err := observer.NewNodesProviderFactory(*cfg, configurationFilePath, numShards)
if err != nil {
return nil, err
}
Expand All @@ -431,6 +430,11 @@ func createVersionsRegistry(
}
}

shardCoord, err := sharding.NewMultiShardCoordinator(numShards, 0)
if err != nil {
return nil, err
}

bp, err := process.NewBaseProcessor(
cfg.GeneralSettings.RequestTimeoutSec,
shardCoord,
Expand Down Expand Up @@ -612,6 +616,32 @@ func waitForServerShutdown(httpServer *http.Server, closableComponents *data.Clo
_ = httpServer.Close()
}

// getNumOfShards will delay the start of proxy until it successfully gets the number of shards
func getNumOfShards(cfg *config.Config) (uint32, error) {
httpClient := &http.Client{}
httpClient.Timeout = time.Duration(cfg.GeneralSettings.RequestTimeoutSec) * time.Second
observersList := make([]string, 0, len(cfg.Observers))
for _, node := range cfg.Observers {
observersList = append(observersList, node.Address)
}
argsNumShardsProcessor := process.ArgNumShardsProcessor{
HttpClient: httpClient,
Observers: observersList,
TimeBetweenNodesRequestsInSec: cfg.GeneralSettings.TimeBetweenNodesRequestsInSec,
NumShardsTimeoutInSec: cfg.GeneralSettings.NumShardsTimeoutInSec,
RequestTimeoutInSec: cfg.GeneralSettings.RequestTimeoutSec,
}
numShardsProcessor, err := process.NewNumShardsProcessor(argsNumShardsProcessor)
if err != nil {
return 0, err
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

return numShardsProcessor.GetNetworkNumShards(ctx)
}

func removeLogColors() {
err := logger.RemoveLogObserver(os.Stdout)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type GeneralSettingsConfig struct {
BalancedObservers bool
BalancedFullHistoryNodes bool
AllowEntireTxPoolFetch bool
NumberOfShards uint32
NumShardsTimeoutInSec int
TimeBetweenNodesRequestsInSec int
}

// Config will hold the whole config file's data
Expand Down
10 changes: 0 additions & 10 deletions observer/baseNodeProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,6 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo
}
}

numOldShards := bnp.numOfShards
numNewShards := newConfig.GeneralSettings.NumberOfShards
if numOldShards != numNewShards {
return data.NodesReloadResponse{
OkRequest: false,
Description: "not reloaded",
Error: fmt.Sprintf("different number of shards. before: %d, now: %d", numOldShards, numNewShards),
}
}

nodes := newConfig.Observers
if nodesType == data.FullHistoryNode {
nodes = newConfig.FullHistoryNodes
Expand Down
12 changes: 0 additions & 12 deletions observer/baseNodeProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,6 @@ func TestBaseNodeProvider_InvalidShardForObserver(t *testing.T) {
require.True(t, strings.Contains(err.Error(), "addr1"))
}

func TestBaseNodeProvider_ReloadNodesDifferentNumberOfNewShard(t *testing.T) {
bnp := &baseNodeProvider{
configurationFilePath: configurationPath,
shardIds: []uint32{0, 1},
numOfShards: 2,
}

response := bnp.ReloadNodes(data.Observer)
require.False(t, response.OkRequest)
require.Contains(t, response.Error, "different number of shards")
}

func TestBaseNodeProvider_ReloadNodesConfigurationFileNotFound(t *testing.T) {
bnp := &baseNodeProvider{
configurationFilePath: "wrong config path",
Expand Down
3 changes: 0 additions & 3 deletions observer/circularQueueNodesProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ func getDummyConfig() config.Config {
ShardId: 1,
},
},
GeneralSettings: config.GeneralSettingsConfig{
NumberOfShards: 2,
},
}
}

Expand Down
12 changes: 7 additions & 5 deletions observer/nodesProviderFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ var log = logger.GetOrCreate("observer")
type nodesProviderFactory struct {
cfg config.Config
configurationFilePath string
numberOfShards uint32
}

// NewNodesProviderFactory returns a new instance of nodesProviderFactory
func NewNodesProviderFactory(cfg config.Config, configurationFilePath string) (*nodesProviderFactory, error) {
func NewNodesProviderFactory(cfg config.Config, configurationFilePath string, numberOfShards uint32) (*nodesProviderFactory, error) {
return &nodesProviderFactory{
cfg: cfg,
configurationFilePath: configurationFilePath,
numberOfShards: numberOfShards,
}, nil
}

Expand All @@ -27,13 +29,13 @@ func (npf *nodesProviderFactory) CreateObservers() (NodesProviderHandler, error)
return NewCircularQueueNodesProvider(
npf.cfg.Observers,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
npf.numberOfShards)
}

return NewSimpleNodesProvider(
npf.cfg.Observers,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
npf.numberOfShards)
}

// CreateFullHistoryNodes will create and return an object of type NodesProviderHandler based on a flag
Expand All @@ -42,7 +44,7 @@ func (npf *nodesProviderFactory) CreateFullHistoryNodes() (NodesProviderHandler,
nodesProviderHandler, err := NewCircularQueueNodesProvider(
npf.cfg.FullHistoryNodes,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
npf.numberOfShards)
if err != nil {
return getDisabledFullHistoryNodesProviderIfNeeded(err)
}
Expand All @@ -53,7 +55,7 @@ func (npf *nodesProviderFactory) CreateFullHistoryNodes() (NodesProviderHandler,
nodesProviderHandler, err := NewSimpleNodesProvider(
npf.cfg.FullHistoryNodes,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
npf.numberOfShards)
if err != nil {
return getDisabledFullHistoryNodesProviderIfNeeded(err)
}
Expand Down
6 changes: 3 additions & 3 deletions observer/nodesProviderFactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestNewObserversProviderFactory_ShouldWork(t *testing.T) {
t.Parallel()

opf, err := NewNodesProviderFactory(config.Config{}, "path")
opf, err := NewNodesProviderFactory(config.Config{}, "path", 2)
assert.Nil(t, err)
assert.NotNil(t, opf)
}
Expand All @@ -21,7 +21,7 @@ func TestObserversProviderFactory_CreateShouldReturnSimple(t *testing.T) {
cfg := getDummyConfig()
cfg.GeneralSettings.BalancedObservers = false

opf, _ := NewNodesProviderFactory(cfg, "path")
opf, _ := NewNodesProviderFactory(cfg, "path", 2)
op, err := opf.CreateObservers()
assert.Nil(t, err)
_, ok := op.(*simpleNodesProvider)
Expand All @@ -34,7 +34,7 @@ func TestObserversProviderFactory_CreateShouldReturnCircularQueue(t *testing.T)
cfg := getDummyConfig()
cfg.GeneralSettings.BalancedObservers = true

opf, _ := NewNodesProviderFactory(cfg, "path")
opf, _ := NewNodesProviderFactory(cfg, "path", 2)
op, err := opf.CreateObservers()
assert.Nil(t, err)
_, ok := op.(*circularQueueNodesProvider)
Expand Down
9 changes: 3 additions & 6 deletions process/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,12 @@ var ErrNoFaucetAccountForGivenShard = errors.New("no faucet account found for th
// ErrNilNodesProvider signals that a nil observers provider has been provided
var ErrNilNodesProvider = errors.New("nil nodes provider")

// ErrInvalidShardId signals that a invalid shard id has been provided
var ErrInvalidShardId = errors.New("invalid shard id")

// ErrNilPubKeyConverter signals that a nil pub key converter has been provided
var ErrNilPubKeyConverter = errors.New("nil pub key converter provided")

// ErrNoValidTransactionToSend signals that no valid transaction were received
var ErrNoValidTransactionToSend = errors.New("no valid transaction to send")

// ErrNilDatabaseConnector signals that a nil database connector was provided
var ErrNilDatabaseConnector = errors.New("not valid database connector")

// ErrCannotParseNodeStatusMetrics signals that the node status metrics cannot be parsed
var ErrCannotParseNodeStatusMetrics = errors.New("cannot parse node status metrics")

Expand Down Expand Up @@ -115,3 +109,6 @@ var ErrEmptyCommitString = errors.New("empty commit id string")

// ErrEmptyPubKey signals that an empty public key has been provided
var ErrEmptyPubKey = errors.New("public key is empty")

// ErrNilHttpClient signals that a nil http client has been provided
var ErrNilHttpClient = errors.New("nil http client")
7 changes: 7 additions & 0 deletions process/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package process

import (
"net/http"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data/transaction"
"github.com/multiversx/mx-chain-core-go/data/vm"
Expand Down Expand Up @@ -78,3 +80,8 @@ type StatusMetricsProvider interface {
GetMetricsForPrometheus() string
IsInterfaceNil() bool
}

// HttpClient defines an interface for the http client
type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
}
16 changes: 16 additions & 0 deletions process/mock/httpClientMock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package mock

import "net/http"

// HttpClientMock -
type HttpClientMock struct {
DoCalled func(req *http.Request) (*http.Response, error)
}

// Do -
func (mock *HttpClientMock) Do(req *http.Request) (*http.Response, error) {
if mock.DoCalled != nil {
return mock.DoCalled(req)
}
return &http.Response{}, nil
}
Loading

0 comments on commit c170a4c

Please sign in to comment.