Skip to content

Commit

Permalink
fix: decrease success rate if validator fails to relay (#1295)
Browse files Browse the repository at this point in the history
# Related Github tickets

- Closes VolumeFi#2173

# Background

Currently if the assigned validator fails to relay a message it is
simply removed from the queue and does not count as a failure to the
assigned validator. This means if a validator is incapable or unwilling
to relay messages, it's success rate metric never changes.

When removing messages from the queue, we check if it was relayed (it
has public or error data). If it does not, then we consider it a failure
and update the validator metrics.

# Testing completed

- [x] test coverage exists or has been added/updated
- [x] tested in a private testnet

# Breaking changes

- [x] I have checked my code for breaking changes
- [x] If there are breaking changes, there is a supporting migration.
  • Loading branch information
maharifu authored Sep 19, 2024
1 parent fe0ae96 commit a5377d2
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 35 deletions.
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ func New(
consensusRegistry,
&app.TreasuryKeeper,
)
app.ConsensusKeeper.AddMessageConsensusAttestedListener(&app.MetrixKeeper)

app.EvmKeeper = *evmmodulekeeper.NewKeeper(
appCodec,
Expand Down
76 changes: 65 additions & 11 deletions x/consensus/keeper/concensus_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/palomachain/paloma/x/consensus/keeper/filters"
"github.com/palomachain/paloma/x/consensus/types"
evmtypes "github.com/palomachain/paloma/x/evm/types"
metrixtypes "github.com/palomachain/paloma/x/metrix/types"
valsettypes "github.com/palomachain/paloma/x/valset/types"
)

Expand Down Expand Up @@ -287,35 +288,88 @@ func (k Keeper) GetMessagesFromQueue(ctx context.Context, queueTypeName string,
// Any message that actually reached consensus will be removed from the queue during
// attestation, other messages like superfluous valset updates will get removed
// in their respective logic flows, but none of them should be using this function.
func (k Keeper) PruneJob(ctx sdk.Context, queueTypeName string, id uint64) (err error) {
if err := k.jailValidatorsWhichMissedAttestation(ctx, queueTypeName, id); err != nil {
liblog.FromSDKLogger(k.Logger(ctx)).
func (k Keeper) PruneJob(sdkCtx sdk.Context, queueTypeName string, id uint64) error {
err := k.jailValidatorsIfNecessary(sdkCtx, queueTypeName, id)
if err != nil {
liblog.FromSDKLogger(k.Logger(sdkCtx)).
WithError(err).
WithFields("msg-id", id).
WithFields("queue-type-name", queueTypeName).
Error("Failed to jail validators that missed attestation.")
Error("Failed to jail validators.")
}

return k.DeleteJob(ctx, queueTypeName, id)
return k.DeleteJob(sdkCtx, queueTypeName, id)
}

func (k Keeper) jailValidatorsWhichMissedAttestation(ctx sdk.Context, queueTypeName string, id uint64) error {
cq, err := k.getConsensusQueue(ctx, queueTypeName)
func (k Keeper) jailValidatorsIfNecessary(
sdkCtx sdk.Context,
queueTypeName string,
id uint64,
) error {
cq, err := k.getConsensusQueue(sdkCtx, queueTypeName)
if err != nil {
return fmt.Errorf("getConsensusQueue: %w", err)
}

msg, err := cq.GetMsgByID(ctx, id)
msg, err := cq.GetMsgByID(sdkCtx, id)
if err != nil {
return fmt.Errorf("getMsgByID: %w", err)
}

if msg.GetPublicAccessData() == nil && msg.GetErrorData() == nil {
// This message was never successfully handled, attestation flock
// should not be punished for this.
// The message was never delivered, so we need to update the validator
// metrics with a failure
return k.punishValidatorForMissingRelay(sdkCtx, msg)
}

// Otherwise, there was a delivery attempt, so only jail validators that
// missed attestation
return k.jailValidatorsWhichMissedAttestation(sdkCtx, msg)
}

func (k Keeper) punishValidatorForMissingRelay(
sdkCtx sdk.Context,
msg types.QueuedSignedMessageI,
) error {
if msg.GetGasEstimate() == 0 {
// If we don't have a gas estimate, this was probably not the
// validator's fault, so we do nothing
return nil
}

consensusMsg, err := msg.ConsensusMsg(k.cdc)
if err != nil {
return err
}

message, ok := consensusMsg.(*evmtypes.Message)
if !ok {
// If this is not a turnstone message, we don't want it
return nil
}

valAddr, err := sdk.ValAddressFromBech32(message.Assignee)
if err != nil {
return err
}

for _, v := range k.onMessageAttestedListeners {
v.OnConsensusMessageAttested(sdkCtx, metrixtypes.MessageAttestedEvent{
AssignedAtBlockHeight: message.AssignedAtBlockHeight,
HandledAtBlockHeight: math.NewInt(sdkCtx.BlockHeight()),
Assignee: valAddr,
MessageID: msg.GetId(),
WasRelayedSuccessfully: false,
})
}

return nil
}

func (k Keeper) jailValidatorsWhichMissedAttestation(
ctx sdk.Context,
msg types.QueuedSignedMessageI,
) error {
r, err := k.consensusChecker.VerifyEvidence(ctx,
slice.Map(msg.GetEvidence(), func(evidence *types.Evidence) libcons.Evidence {
return evidence
Expand Down Expand Up @@ -371,7 +425,7 @@ func (k Keeper) jailValidatorsWhichMissedAttestation(ctx sdk.Context, queueTypeN
// This validator is part of the active valset but did not supply evidence.
// That's not very nice. Let's jail them.
if err := k.valset.Jail(ctx, v.GetAddress(), fmt.Sprintf("No evidence supplied for contentious message %d", msg.GetId())); err != nil {
liblog.FromSDKLogger(k.Logger(ctx)).WithError(err).WithValidator(v.GetAddress().String()).WithFields("msg-id", id).Error("Failed to jail validator.")
liblog.FromSDKLogger(k.Logger(ctx)).WithError(err).WithValidator(v.GetAddress().String()).WithFields("msg-id", msg.GetId()).Error("Failed to jail validator.")
}
}
}
Expand Down
51 changes: 37 additions & 14 deletions x/consensus/keeper/concensus_keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ func TestEndToEndTestingOfPuttingAndGettingMessagesOfTheConsensusQueue(t *testin
})
}

func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
func TestJailValidatorsIfNecessary(t *testing.T) {
queue := types.Queue(defaultQueueName, chainType, chainReferenceID)
keeper, ms, ctx := newConsensusKeeper(t)
msgType := &types.SimpleMessage{}
msgType := &evmtypes.Message{}
serializedTx, err := hex.DecodeString("02f87201108405f5e100850b68a0aa00825208941f9c2e67dbbe4c457a5e2be0bc31e67ce5953a2d87470de4df82000080c001a0e05de0771f8d577ec5aa440612c0e8f560d732d5162db0187cfaf56ac50c3716a0147565f4b0924a5adda25f55330c385448e0507d1219d4dac0950e2872682124")
require.NoError(t, err)

Expand All @@ -111,21 +111,24 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
)

t.Run("with unsupported queue name", func(t *testing.T) {
err := keeper.jailValidatorsWhichMissedAttestation(ctx, "no support", 0)
err := keeper.jailValidatorsIfNecessary(ctx, "no support", 0)
require.ErrorContains(t, err, "getConsensusQueue", "returns an error")
})

t.Run("with unknown message ID", func(t *testing.T) {
err := keeper.jailValidatorsWhichMissedAttestation(ctx, queue, 42)
err := keeper.jailValidatorsIfNecessary(ctx, queue, 42)
require.ErrorContains(t, err, "getMsgByID", "returns an error")
})

t.Run("with unknown message ID", func(t *testing.T) {
err := keeper.jailValidatorsWhichMissedAttestation(ctx, queue, 42)
err := keeper.jailValidatorsIfNecessary(ctx, queue, 42)
require.ErrorContains(t, err, "getMsgByID", "returns an error")
})

testMsg := types.SimpleMessage{Sender: "user", Hello: "foo", World: "bar"}
assignee, _ := sdk.ValAddressFromBech32("palomavaloper1tsu8nthuspe4zlkejtj3v27rtq8qz7q6983zt2")
testMsg := evmtypes.Message{
Assignee: assignee.String(),
}
t.Run("with message that actually forms consensus", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}})
require.NoError(t, err)
Expand Down Expand Up @@ -159,7 +162,7 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
TotalShares: math.NewInt(4000),
}, nil).Times(1)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.Error(t, err)
require.ErrorContains(t, err, "unexpected message with valid consensus found, skipping jailing steps")
})
Expand Down Expand Up @@ -198,9 +201,10 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
TotalShares: math.NewInt(10000),
}, nil).Times(2)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.NoError(t, err, "should not do anything")
})

t.Run("with expected validators missing", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}})
require.NoError(t, err)
Expand Down Expand Up @@ -237,17 +241,35 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
ms.ValsetKeeper.On("Jail", mock.Anything, validators[0].GetAddress(), mock.Anything).Return(nil)
ms.ValsetKeeper.On("Jail", mock.Anything, validators[1].GetAddress(), mock.Anything).Return(nil)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.NoError(t, err, "should not do anything")
})
t.Run("with neither error nor public access data set without gas estimate", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg,
&consensus.PutOptions{RequireGasEstimation: true})
require.NoError(t, err)

err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.NoError(t, err, "should not do anything")
})
t.Run("with neither error nor public access data set", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, nil)
t.Run("with neither error nor public access data set with gas estimate", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg,
&consensus.PutOptions{RequireGasEstimation: true})
require.NoError(t, err)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
cq, err := keeper.getConsensusQueue(ctx, queue)
require.NoError(t, err)

err = cq.SetElectedGasEstimate(ctx, mID, 1)
require.NoError(t, err)

ms.MetrixKeeper.On("OnConsensusMessageAttested", mock.Anything, mock.Anything).Return(nil)

err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.NoError(t, err, "should not do anything")
})
})

t.Run("with expected validators missing, but less than 10% share supplied evidence", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}})
require.NoError(t, err)
Expand Down Expand Up @@ -281,10 +303,11 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
TotalShares: math.NewInt(11000),
}, nil).Times(1)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.Error(t, err, "should return error")
require.ErrorContains(t, err, "message consensus failure likely caused by faulty response data")
})

t.Run("with expected validators missing, but 10% of share or more supplied evidence", func(t *testing.T) {
mID, err := keeper.PutMessageInQueue(ctx, queue, &testMsg, &consensus.PutOptions{PublicAccessData: []byte{1}})
require.NoError(t, err)
Expand Down Expand Up @@ -322,7 +345,7 @@ func TestJailValidatorsWhichMissedAttestation(t *testing.T) {
TotalShares: math.NewInt(10000),
}, nil).Times(2)

err = keeper.jailValidatorsWhichMissedAttestation(ctx, queue, mID)
err = keeper.jailValidatorsIfNecessary(ctx, queue, mID)
require.NoError(t, err, "should jail but not return error")
})
}
Expand Down
27 changes: 17 additions & 10 deletions x/consensus/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/palomachain/paloma/util/libcons"
"github.com/palomachain/paloma/util/liblog"
"github.com/palomachain/paloma/x/consensus/types"
metrixtypes "github.com/palomachain/paloma/x/metrix/types"
)

type FeeProvider interface {
Expand All @@ -31,10 +32,11 @@ type (

valset types.ValsetKeeper

registry *registry
evmKeeper types.EvmKeeper
consensusChecker *libcons.ConsensusChecker
feeProvider FeeProvider
registry *registry
evmKeeper types.EvmKeeper
consensusChecker *libcons.ConsensusChecker
feeProvider FeeProvider
onMessageAttestedListeners []metrixtypes.OnConsensusMessageAttestedListener
}
)

Expand All @@ -47,12 +49,13 @@ func NewKeeper(
fp FeeProvider,
) *Keeper {
k := &Keeper{
cdc: cdc,
storeKey: storeKey,
paramstore: ps,
valset: valsetKeeper,
registry: reg,
feeProvider: fp,
cdc: cdc,
storeKey: storeKey,
paramstore: ps,
valset: valsetKeeper,
registry: reg,
feeProvider: fp,
onMessageAttestedListeners: make([]metrixtypes.OnConsensusMessageAttestedListener, 0),
}
ider := keeperutil.NewIDGenerator(k, nil)
k.ider = ider
Expand All @@ -61,6 +64,10 @@ func NewKeeper(
return k
}

func (k *Keeper) AddMessageConsensusAttestedListener(l metrixtypes.OnConsensusMessageAttestedListener) {
k.onMessageAttestedListeners = append(k.onMessageAttestedListeners, l)
}

func (k Keeper) Logger(ctx context.Context) log.Logger {
sdkCtx := sdk.UnwrapSDKContext(ctx)
return liblog.FromSDKLogger(sdkCtx.Logger()).With("module", fmt.Sprintf("x/%s", types.ModuleName))
Expand Down
11 changes: 11 additions & 0 deletions x/consensus/keeper/keeper_setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import (

type mockedServices struct {
ValsetKeeper *mocks.ValsetKeeper
MetrixKeeper *mocks.MetrixKeeper
}

func newConsensusKeeper(t testing.TB) (*Keeper, mockedServices, sdk.Context) {
config := sdk.GetConfig()
config.SetBech32PrefixForAccount("paloma", "pub")
config.SetBech32PrefixForValidator("palomavaloper", "valoperpub")

logger := log.NewNopLogger()

storeKey := storetypes.NewKVStoreKey(types.StoreKey)
Expand Down Expand Up @@ -52,8 +57,12 @@ func newConsensusKeeper(t testing.TB) (*Keeper, mockedServices, sdk.Context) {
memStoreKey,
"ConsensusParams",
)

metrixKeeper := mocks.NewMetrixKeeper(t)

ms := mockedServices{
ValsetKeeper: mocks.NewValsetKeeper(t),
MetrixKeeper: metrixKeeper,
}
k := NewKeeper(
appCodec,
Expand All @@ -64,6 +73,8 @@ func newConsensusKeeper(t testing.TB) (*Keeper, mockedServices, sdk.Context) {
nil,
)

k.AddMessageConsensusAttestedListener(metrixKeeper)

ctx := sdk.NewContext(stateStore, tmproto.Header{}, false, logger)

// Initialize params
Expand Down
6 changes: 6 additions & 0 deletions x/consensus/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

sdk "github.com/cosmos/cosmos-sdk/types"
xchain "github.com/palomachain/paloma/internal/x-chain"
metrixtypes "github.com/palomachain/paloma/x/metrix/types"
valsettypes "github.com/palomachain/paloma/x/valset/types"
)

Expand Down Expand Up @@ -33,3 +34,8 @@ type ValsetKeeper interface {
type EvmKeeper interface {
PickValidatorForMessage(ctx context.Context, chainReferenceID string, requirements *xchain.JobRequirements) (string, string, error)
}

//go:generate mockery --name=MetrixKeeper
type MetrixKeeper interface {
OnConsensusMessageAttested(context.Context, metrixtypes.MessageAttestedEvent)
}
34 changes: 34 additions & 0 deletions x/consensus/types/mocks/MetrixKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a5377d2

Please sign in to comment.