Skip to content

Commit

Permalink
Merge pull request #5829 from multiversx/fix-redundancy-metric
Browse files Browse the repository at this point in the history
Redundancy metrics updates
  • Loading branch information
iulianpascalau authored Jan 16, 2024
2 parents 29ffc47 + f037a6f commit 7177707
Show file tree
Hide file tree
Showing 17 changed files with 305 additions and 6 deletions.
3 changes: 3 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ const MetricRedundancyLevel = "erd_redundancy_level"
// MetricRedundancyIsMainActive is the metric that specifies data about the redundancy main machine
const MetricRedundancyIsMainActive = "erd_redundancy_is_main_active"

// MetricRedundancyStepInReason is the metric that specifies why the back-up machine stepped in
const MetricRedundancyStepInReason = "erd_redundancy_step_in_reason"

// MetricValueNA represents the value to be used when a metric is not available/applicable
const MetricValueNA = "N/A"

Expand Down
1 change: 1 addition & 0 deletions common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ type ManagedPeersHolder interface {
GetNextPeerAuthenticationTime(pkBytes []byte) (time.Time, error)
SetNextPeerAuthenticationTime(pkBytes []byte, nextTime time.Time)
IsMultiKeyMode() bool
GetRedundancyStepInReason() string
IsInterfaceNil() bool
}

Expand Down
1 change: 1 addition & 0 deletions consensus/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,6 @@ type KeysHandler interface {
GetAssociatedPid(pkBytes []byte) core.PeerID
IsOriginalPublicKeyOfTheNode(pkBytes []byte) bool
ResetRoundsWithoutReceivedMessages(pkBytes []byte, pid core.PeerID)
GetRedundancyStepInReason() string
IsInterfaceNil() bool
}
5 changes: 5 additions & 0 deletions consensus/spos/consensusState.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ func (cns *ConsensusState) IsMultiKeyJobDone(currentSubroundId int) bool {
return true
}

// GetMultikeyRedundancyStepInReason returns the reason if the current node stepped in as a multikey redundancy node
func (cns *ConsensusState) GetMultikeyRedundancyStepInReason() string {
return cns.keysHandler.GetRedundancyStepInReason()
}

// ResetRoundsWithoutReceivedMessages will reset the rounds received without a message for a specified public key by
// providing also the peer ID from the received message
func (cns *ConsensusState) ResetRoundsWithoutReceivedMessages(pkBytes []byte, pid core.PeerID) {
Expand Down
35 changes: 35 additions & 0 deletions consensus/spos/consensusState_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"testing"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-go/consensus"
"github.com/multiversx/mx-chain-go/consensus/spos"
Expand Down Expand Up @@ -582,3 +583,37 @@ func TestConsensusState_IsMultiKeyJobDone(t *testing.T) {
assert.True(t, cns.IsMultiKeyJobDone(0))
})
}

func TestConsensusState_GetMultikeyRedundancyStepInReason(t *testing.T) {
t.Parallel()

expectedString := "expected string"
keysHandler := &testscommon.KeysHandlerStub{
GetRedundancyStepInReasonCalled: func() string {
return expectedString
},
}
cns := internalInitConsensusStateWithKeysHandler(keysHandler)

assert.Equal(t, expectedString, cns.GetMultikeyRedundancyStepInReason())
}

func TestConsensusState_ResetRoundsWithoutReceivedMessages(t *testing.T) {
t.Parallel()

resetRoundsWithoutReceivedMessagesCalled := false
testPkBytes := []byte("pk bytes")
testPid := core.PeerID("pid")

keysHandler := &testscommon.KeysHandlerStub{
ResetRoundsWithoutReceivedMessagesCalled: func(pkBytes []byte, pid core.PeerID) {
resetRoundsWithoutReceivedMessagesCalled = true
assert.Equal(t, testPkBytes, pkBytes)
assert.Equal(t, testPid, pid)
},
}
cns := internalInitConsensusStateWithKeysHandler(keysHandler)

cns.ResetRoundsWithoutReceivedMessages(testPkBytes, testPid)
assert.True(t, resetRoundsWithoutReceivedMessagesCalled)
}
13 changes: 13 additions & 0 deletions consensus/spos/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/multiversx/mx-chain-go/process"
)

// RedundancySingleKeySteppedIn exposes the redundancySingleKeySteppedIn constant
const RedundancySingleKeySteppedIn = redundancySingleKeySteppedIn

type RoundConsensus struct {
*roundConsensus
}
Expand Down Expand Up @@ -173,6 +176,16 @@ func (wrk *Worker) CheckSelfState(cnsDta *consensus.Message) error {
return wrk.checkSelfState(cnsDta)
}

// SetRedundancyHandler -
func (wrk *Worker) SetRedundancyHandler(redundancyHandler consensus.NodeRedundancyHandler) {
wrk.nodeRedundancyHandler = redundancyHandler
}

// SetKeysHandler -
func (wrk *Worker) SetKeysHandler(keysHandler consensus.KeysHandler) {
wrk.consensusState.keysHandler = keysHandler
}

// EligibleList -
func (rcns *RoundConsensus) EligibleList() map[string]struct{} {
return rcns.eligibleNodes
Expand Down
16 changes: 15 additions & 1 deletion consensus/spos/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var _ closing.Closer = (*Worker)(nil)

// sleepTime defines the time in milliseconds between each iteration made in checkChannels method
const sleepTime = 5 * time.Millisecond
const redundancySingleKeySteppedIn = "single-key node stepped in"

// Worker defines the data needed by spos to communicate between nodes which are in the validators group
type Worker struct {
Expand Down Expand Up @@ -545,7 +546,20 @@ func (wrk *Worker) processReceivedHeaderMetric(cnsDta *consensus.Message) {
}
percent := sinceRoundStart * 100 / wrk.roundHandler.TimeDuration()
wrk.appStatusHandler.SetUInt64Value(common.MetricReceivedProposedBlock, uint64(percent))
wrk.appStatusHandler.SetStringValue(common.MetricRedundancyIsMainActive, strconv.FormatBool(wrk.nodeRedundancyHandler.IsMainMachineActive()))

isMainMachineActive, redundancyReason := wrk.computeRedundancyMetrics()
wrk.appStatusHandler.SetStringValue(common.MetricRedundancyIsMainActive, strconv.FormatBool(isMainMachineActive))
wrk.appStatusHandler.SetStringValue(common.MetricRedundancyStepInReason, redundancyReason)
}

func (wrk *Worker) computeRedundancyMetrics() (bool, string) {
if !wrk.nodeRedundancyHandler.IsMainMachineActive() {
return false, redundancySingleKeySteppedIn
}

reason := wrk.consensusState.GetMultikeyRedundancyStepInReason()

return len(reason) == 0, reason
}

func (wrk *Worker) checkSelfState(cnsDta *consensus.Message) error {
Expand Down
97 changes: 93 additions & 4 deletions consensus/spos/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"strconv"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -628,13 +629,21 @@ func TestWorker_ProcessReceivedMessageComputeReceivedProposedBlockMetric(t *test
delay := time.Millisecond * 430
roundStartTimeStamp := time.Now()

receivedValue := testWorkerProcessReceivedMessageComputeReceivedProposedBlockMetric(roundStartTimeStamp, delay, roundDuration)
receivedValue, redundancyReason, redundancyStatus := testWorkerProcessReceivedMessageComputeReceivedProposedBlockMetric(
t,
roundStartTimeStamp,
delay,
roundDuration,
&mock.NodeRedundancyHandlerStub{},
&testscommon.KeysHandlerStub{})

minimumExpectedValue := uint64(delay * 100 / roundDuration)
assert.True(t,
receivedValue >= minimumExpectedValue,
fmt.Sprintf("minimum expected was %d, got %d", minimumExpectedValue, receivedValue),
)
assert.Empty(t, redundancyReason)
assert.True(t, redundancyStatus)
})
t.Run("time.Since returns negative value", func(t *testing.T) {
// test the edgecase when the returned NTP time stored in the round handler is
Expand All @@ -645,23 +654,101 @@ func TestWorker_ProcessReceivedMessageComputeReceivedProposedBlockMetric(t *test
delay := time.Millisecond * 430
roundStartTimeStamp := time.Now().Add(time.Minute)

receivedValue := testWorkerProcessReceivedMessageComputeReceivedProposedBlockMetric(roundStartTimeStamp, delay, roundDuration)
receivedValue, redundancyReason, redundancyStatus := testWorkerProcessReceivedMessageComputeReceivedProposedBlockMetric(
t,
roundStartTimeStamp,
delay,
roundDuration,
&mock.NodeRedundancyHandlerStub{},
&testscommon.KeysHandlerStub{})

assert.Zero(t, receivedValue)
assert.Empty(t, redundancyReason)
assert.True(t, redundancyStatus)
})
t.Run("normal operation as a single-key redundancy node", func(t *testing.T) {
t.Parallel()

roundDuration := time.Millisecond * 1000
delay := time.Millisecond * 430
roundStartTimeStamp := time.Now()

receivedValue, redundancyReason, redundancyStatus := testWorkerProcessReceivedMessageComputeReceivedProposedBlockMetric(
t,
roundStartTimeStamp,
delay,
roundDuration,
&mock.NodeRedundancyHandlerStub{
IsMainMachineActiveCalled: func() bool {
return false
},
},
&testscommon.KeysHandlerStub{})

minimumExpectedValue := uint64(delay * 100 / roundDuration)
assert.True(t,
receivedValue >= minimumExpectedValue,
fmt.Sprintf("minimum expected was %d, got %d", minimumExpectedValue, receivedValue),
)
assert.Equal(t, spos.RedundancySingleKeySteppedIn, redundancyReason)
assert.False(t, redundancyStatus)
})
t.Run("normal operation as a multikey-key redundancy node", func(t *testing.T) {
t.Parallel()

roundDuration := time.Millisecond * 1000
delay := time.Millisecond * 430
roundStartTimeStamp := time.Now()

multikeyReason := "multikey step in reason"
receivedValue, redundancyReason, redundancyStatus := testWorkerProcessReceivedMessageComputeReceivedProposedBlockMetric(
t,
roundStartTimeStamp,
delay,
roundDuration,
&mock.NodeRedundancyHandlerStub{},
&testscommon.KeysHandlerStub{
GetRedundancyStepInReasonCalled: func() string {
return multikeyReason
},
})

minimumExpectedValue := uint64(delay * 100 / roundDuration)
assert.True(t,
receivedValue >= minimumExpectedValue,
fmt.Sprintf("minimum expected was %d, got %d", minimumExpectedValue, receivedValue),
)
assert.Equal(t, multikeyReason, redundancyReason)
assert.False(t, redundancyStatus)
})
}

func testWorkerProcessReceivedMessageComputeReceivedProposedBlockMetric(
t *testing.T,
roundStartTimeStamp time.Time,
delay time.Duration,
roundDuration time.Duration,
) uint64 {
redundancyHandler consensus.NodeRedundancyHandler,
keysHandler consensus.KeysHandler,
) (uint64, string, bool) {
marshaller := mock.MarshalizerMock{}
receivedValue := uint64(0)
redundancyReason := ""
redundancyStatus := false
wrk := *initWorker(&statusHandlerMock.AppStatusHandlerStub{
SetUInt64ValueHandler: func(key string, value uint64) {
receivedValue = value
},
SetStringValueHandler: func(key string, value string) {
if key == common.MetricRedundancyIsMainActive {
var err error
redundancyStatus, err = strconv.ParseBool(value)
assert.Nil(t, err)
}
if key == common.MetricRedundancyStepInReason {
redundancyReason = value
}
},
})
wrk.SetBlockProcessor(&testscommon.BlockProcessorStub{
DecodeBlockHeaderCalled: func(dta []byte) data.HeaderHandler {
Expand All @@ -686,6 +773,8 @@ func testWorkerProcessReceivedMessageComputeReceivedProposedBlockMetric(
return roundStartTimeStamp
},
})
wrk.SetRedundancyHandler(redundancyHandler)
wrk.SetKeysHandler(keysHandler)
hdr := &block.Header{
ChainID: chainID,
PrevHash: []byte("prev hash"),
Expand Down Expand Up @@ -725,7 +814,7 @@ func testWorkerProcessReceivedMessageComputeReceivedProposedBlockMetric(
}
_ = wrk.ProcessReceivedMessage(msg, "", &p2pmocks.MessengerStub{})

return receivedValue
return receivedValue, redundancyReason, redundancyStatus
}

func TestWorker_ProcessReceivedMessageInconsistentChainIDInConsensusMessageShouldErr(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions keysManagement/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"github.com/multiversx/mx-chain-go/common"
)

// exported constants
const (
RedundancyReasonForOneKey = redundancyReasonForOneKey
RedundancyReasonForMultipleKeys = redundancyReasonForMultipleKeys
)

// GetRoundsOfInactivity -
func (pInfo *peerInfo) GetRoundsOfInactivity() int {
pInfo.mutChangeableData.RLock()
Expand Down
5 changes: 5 additions & 0 deletions keysManagement/keysHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (handler *keysHandler) ResetRoundsWithoutReceivedMessages(pkBytes []byte, p
handler.managedPeersHolder.ResetRoundsWithoutReceivedMessages(pkBytes, pid)
}

// GetRedundancyStepInReason returns the reason if the current node stepped in as a redundancy node
func (handler *keysHandler) GetRedundancyStepInReason() string {
return handler.managedPeersHolder.GetRedundancyStepInReason()
}

// IsInterfaceNil returns true if there is no value under the interface
func (handler *keysHandler) IsInterfaceNil() bool {
return handler == nil
Expand Down
15 changes: 15 additions & 0 deletions keysManagement/keysHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,18 @@ func TestKeysHandler_ResetRoundsWithoutReceivedMessages(t *testing.T) {
assert.Equal(t, 1, len(mapResetCalled))
assert.Equal(t, 1, mapResetCalled[string(randomPublicKeyBytes)])
}

func TestKeysHandler_GetRedundancyStepInReason(t *testing.T) {
t.Parallel()

expectedString := "expected string"
args := createMockArgsKeysHandler()
args.ManagedPeersHolder = &testscommon.ManagedPeersHolderStub{
GetRedundancyStepInReasonCalled: func() string {
return expectedString
},
}

handler, _ := keysManagement.NewKeysHandler(args)
assert.Equal(t, expectedString, handler.GetRedundancyStepInReason())
}
25 changes: 25 additions & 0 deletions keysManagement/managedPeersHolder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (

var log = logger.GetOrCreate("keysManagement")

const (
redundancyReasonForOneKey = "multikey node stepped in with one key"
redundancyReasonForMultipleKeys = "multikey node stepped in with %d keys"
)

type managedPeersHolder struct {
mut sync.RWMutex
defaultPeerInfoCurrentIndex int
Expand Down Expand Up @@ -369,6 +374,26 @@ func (holder *managedPeersHolder) IsMultiKeyMode() bool {
return len(holder.data) > 0
}

// GetRedundancyStepInReason returns the reason if the current node stepped in as a redundancy node
// Returns empty string if the current node is the main multikey machine, the machine is not running in multikey mode
// or the machine is acting as a backup but the main machine is acting accordingly
func (holder *managedPeersHolder) GetRedundancyStepInReason() string {
if holder.isMainMachine {
return ""
}

numManagedKeys := len(holder.GetManagedKeysByCurrentNode())
if numManagedKeys == 0 {
return ""
}

if numManagedKeys == 1 {
return redundancyReasonForOneKey
}

return fmt.Sprintf(redundancyReasonForMultipleKeys, numManagedKeys)
}

// IsInterfaceNil returns true if there is no value under the interface
func (holder *managedPeersHolder) IsInterfaceNil() bool {
return holder == nil
Expand Down
Loading

0 comments on commit 7177707

Please sign in to comment.