Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get number of shards from observers #463

Merged
merged 2 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions cmd/proxy/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@
# With this flag disabled, /transaction/pool route will return an error
AllowEntireTxPoolFetch = false

# NumberOfShards represents the total number of shards from the network (excluding metachain)
NumberOfShards = 3
# NumShardsTimeoutInSec represents the maximum number of seconds to wait for at least one observer online until throwing an error
NumShardsTimeoutInSec = 90

# TimeBetweenNodesRequestsInSec represents time to wait before retry to get the number of shards from observers
TimeBetweenNodesRequestsInSec = 2

[AddressPubkeyConverter]
#Length specifies the length in bytes of an address
Length = 32
#Length specifies the length in bytes of an address
Length = 32

# Type specifies the type of public keys: hex or bech32
Type = "bech32"
# Type specifies the type of public keys: hex or bech32
Type = "bech32"

[Marshalizer]
Type = "gogo protobuf"
Expand Down
36 changes: 33 additions & 3 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ func createVersionsRegistryTestOrProduction(
ValStatsCacheValidityDurationSec: 60,
EconomicsMetricsCacheValidityDurationSec: 6,
FaucetValue: "10000000000",
NumberOfShards: 3,
},
ApiLogging: config.ApiLoggingConfig{
LoggingEnabled: true,
Expand Down Expand Up @@ -409,12 +408,12 @@ func createVersionsRegistry(
return nil, err
}

shardCoord, err := sharding.NewMultiShardCoordinator(cfg.GeneralSettings.NumberOfShards, 0)
numShards, err := getNumOfShards(cfg)
if err != nil {
return nil, err
}

nodesProviderFactory, err := observer.NewNodesProviderFactory(*cfg, configurationFilePath)
nodesProviderFactory, err := observer.NewNodesProviderFactory(*cfg, configurationFilePath, numShards)
if err != nil {
return nil, err
}
Expand All @@ -431,6 +430,11 @@ func createVersionsRegistry(
}
}

shardCoord, err := sharding.NewMultiShardCoordinator(numShards, 0)
if err != nil {
return nil, err
}

bp, err := process.NewBaseProcessor(
cfg.GeneralSettings.RequestTimeoutSec,
shardCoord,
Expand Down Expand Up @@ -612,6 +616,32 @@ func waitForServerShutdown(httpServer *http.Server, closableComponents *data.Clo
_ = httpServer.Close()
}

// getNumOfShards will delay the start of proxy until it successfully gets the number of shards
func getNumOfShards(cfg *config.Config) (uint32, error) {
httpClient := &http.Client{}
httpClient.Timeout = time.Duration(cfg.GeneralSettings.RequestTimeoutSec) * time.Second
observersList := make([]string, 0, len(cfg.Observers))
for _, node := range cfg.Observers {
observersList = append(observersList, node.Address)
}
argsNumShardsProcessor := process.ArgNumShardsProcessor{
HttpClient: httpClient,
Observers: observersList,
TimeBetweenNodesRequestsInSec: cfg.GeneralSettings.TimeBetweenNodesRequestsInSec,
NumShardsTimeoutInSec: cfg.GeneralSettings.NumShardsTimeoutInSec,
RequestTimeoutInSec: cfg.GeneralSettings.RequestTimeoutSec,
}
numShardsProcessor, err := process.NewNumShardsProcessor(argsNumShardsProcessor)
if err != nil {
return 0, err
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

return numShardsProcessor.GetNetworkNumShards(ctx)
}

func removeLogColors() {
err := logger.RemoveLogObserver(os.Stdout)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type GeneralSettingsConfig struct {
BalancedObservers bool
BalancedFullHistoryNodes bool
AllowEntireTxPoolFetch bool
NumberOfShards uint32
NumShardsTimeoutInSec int
TimeBetweenNodesRequestsInSec int
}

// Config will hold the whole config file's data
Expand Down
10 changes: 0 additions & 10 deletions observer/baseNodeProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,6 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo
}
}

numOldShards := bnp.numOfShards
numNewShards := newConfig.GeneralSettings.NumberOfShards
if numOldShards != numNewShards {
return data.NodesReloadResponse{
OkRequest: false,
Description: "not reloaded",
Error: fmt.Sprintf("different number of shards. before: %d, now: %d", numOldShards, numNewShards),
}
}

nodes := newConfig.Observers
if nodesType == data.FullHistoryNode {
nodes = newConfig.FullHistoryNodes
Expand Down
12 changes: 0 additions & 12 deletions observer/baseNodeProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,6 @@ func TestBaseNodeProvider_InvalidShardForObserver(t *testing.T) {
require.True(t, strings.Contains(err.Error(), "addr1"))
}

func TestBaseNodeProvider_ReloadNodesDifferentNumberOfNewShard(t *testing.T) {
bnp := &baseNodeProvider{
configurationFilePath: configurationPath,
shardIds: []uint32{0, 1},
numOfShards: 2,
}

response := bnp.ReloadNodes(data.Observer)
require.False(t, response.OkRequest)
require.Contains(t, response.Error, "different number of shards")
}

func TestBaseNodeProvider_ReloadNodesConfigurationFileNotFound(t *testing.T) {
bnp := &baseNodeProvider{
configurationFilePath: "wrong config path",
Expand Down
3 changes: 0 additions & 3 deletions observer/circularQueueNodesProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ func getDummyConfig() config.Config {
ShardId: 1,
},
},
GeneralSettings: config.GeneralSettingsConfig{
NumberOfShards: 2,
},
}
}

Expand Down
12 changes: 7 additions & 5 deletions observer/nodesProviderFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ var log = logger.GetOrCreate("observer")
type nodesProviderFactory struct {
cfg config.Config
configurationFilePath string
numberOfShards uint32
}

// NewNodesProviderFactory returns a new instance of nodesProviderFactory
func NewNodesProviderFactory(cfg config.Config, configurationFilePath string) (*nodesProviderFactory, error) {
func NewNodesProviderFactory(cfg config.Config, configurationFilePath string, numberOfShards uint32) (*nodesProviderFactory, error) {
return &nodesProviderFactory{
cfg: cfg,
configurationFilePath: configurationFilePath,
numberOfShards: numberOfShards,
}, nil
}

Expand All @@ -27,13 +29,13 @@ func (npf *nodesProviderFactory) CreateObservers() (NodesProviderHandler, error)
return NewCircularQueueNodesProvider(
npf.cfg.Observers,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
npf.numberOfShards)
}

return NewSimpleNodesProvider(
npf.cfg.Observers,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
npf.numberOfShards)
}

// CreateFullHistoryNodes will create and return an object of type NodesProviderHandler based on a flag
Expand All @@ -42,7 +44,7 @@ func (npf *nodesProviderFactory) CreateFullHistoryNodes() (NodesProviderHandler,
nodesProviderHandler, err := NewCircularQueueNodesProvider(
npf.cfg.FullHistoryNodes,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
npf.numberOfShards)
if err != nil {
return getDisabledFullHistoryNodesProviderIfNeeded(err)
}
Expand All @@ -53,7 +55,7 @@ func (npf *nodesProviderFactory) CreateFullHistoryNodes() (NodesProviderHandler,
nodesProviderHandler, err := NewSimpleNodesProvider(
npf.cfg.FullHistoryNodes,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
npf.numberOfShards)
if err != nil {
return getDisabledFullHistoryNodesProviderIfNeeded(err)
}
Expand Down
6 changes: 3 additions & 3 deletions observer/nodesProviderFactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestNewObserversProviderFactory_ShouldWork(t *testing.T) {
t.Parallel()

opf, err := NewNodesProviderFactory(config.Config{}, "path")
opf, err := NewNodesProviderFactory(config.Config{}, "path", 2)
assert.Nil(t, err)
assert.NotNil(t, opf)
}
Expand All @@ -21,7 +21,7 @@ func TestObserversProviderFactory_CreateShouldReturnSimple(t *testing.T) {
cfg := getDummyConfig()
cfg.GeneralSettings.BalancedObservers = false

opf, _ := NewNodesProviderFactory(cfg, "path")
opf, _ := NewNodesProviderFactory(cfg, "path", 2)
op, err := opf.CreateObservers()
assert.Nil(t, err)
_, ok := op.(*simpleNodesProvider)
Expand All @@ -34,7 +34,7 @@ func TestObserversProviderFactory_CreateShouldReturnCircularQueue(t *testing.T)
cfg := getDummyConfig()
cfg.GeneralSettings.BalancedObservers = true

opf, _ := NewNodesProviderFactory(cfg, "path")
opf, _ := NewNodesProviderFactory(cfg, "path", 2)
op, err := opf.CreateObservers()
assert.Nil(t, err)
_, ok := op.(*circularQueueNodesProvider)
Expand Down
9 changes: 3 additions & 6 deletions process/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,12 @@ var ErrNoFaucetAccountForGivenShard = errors.New("no faucet account found for th
// ErrNilNodesProvider signals that a nil observers provider has been provided
var ErrNilNodesProvider = errors.New("nil nodes provider")

// ErrInvalidShardId signals that a invalid shard id has been provided
var ErrInvalidShardId = errors.New("invalid shard id")

// ErrNilPubKeyConverter signals that a nil pub key converter has been provided
var ErrNilPubKeyConverter = errors.New("nil pub key converter provided")

// ErrNoValidTransactionToSend signals that no valid transaction were received
var ErrNoValidTransactionToSend = errors.New("no valid transaction to send")

// ErrNilDatabaseConnector signals that a nil database connector was provided
var ErrNilDatabaseConnector = errors.New("not valid database connector")

// ErrCannotParseNodeStatusMetrics signals that the node status metrics cannot be parsed
var ErrCannotParseNodeStatusMetrics = errors.New("cannot parse node status metrics")

Expand Down Expand Up @@ -115,3 +109,6 @@ var ErrEmptyCommitString = errors.New("empty commit id string")

// ErrEmptyPubKey signals that an empty public key has been provided
var ErrEmptyPubKey = errors.New("public key is empty")

// ErrNilHttpClient signals that a nil http client has been provided
var ErrNilHttpClient = errors.New("nil http client")
7 changes: 7 additions & 0 deletions process/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package process

import (
"net/http"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data/transaction"
"github.com/multiversx/mx-chain-core-go/data/vm"
Expand Down Expand Up @@ -78,3 +80,8 @@ type StatusMetricsProvider interface {
GetMetricsForPrometheus() string
IsInterfaceNil() bool
}

// HttpClient defines an interface for the http client
type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
}
16 changes: 16 additions & 0 deletions process/mock/httpClientMock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package mock

import "net/http"

// HttpClientMock -
type HttpClientMock struct {
DoCalled func(req *http.Request) (*http.Response, error)
}

// Do -
func (mock *HttpClientMock) Do(req *http.Request) (*http.Response, error) {
if mock.DoCalled != nil {
return mock.DoCalled(req)
}
return &http.Response{}, nil
}
Loading
Loading