From e5058b5795089a2797407021eb945e7b1bf1269b Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 14 Oct 2022 11:10:47 +0300 Subject: [PATCH 1/8] Delete shares by operator ID instead of operator PK on operator removal (#720) * Delete shares by operator ID instead of public key on operator removal event * Setup ci/cd for hotfix-delete-shares-by-operator-id branch * Setup ci/cd for hotfix-delete-shares-by-operator-id branc * Revert "Setup ci/cd for hotfix-delete-shares-by-operator-id branch" This reverts commit 91c85fd8feeaf4e81882c47de0bcc4fa8602afd1. * TODO * basic tests Co-authored-by: stoyan.peev Co-authored-by: Amir Yahalom --- operator/validator/event_handler.go | 3 ++- operator/validator/storage.go | 25 +++++++++++++++++++++ protocol/v1/blockchain/beacon/share.go | 14 +++++++++++- protocol/v1/blockchain/beacon/share_test.go | 21 +++++++++++++++++ protocol/v1/validator/storage.go | 1 + 5 files changed, 62 insertions(+), 2 deletions(-) diff --git a/operator/validator/event_handler.go b/operator/validator/event_handler.go index 5bcec440ed..4fa4f7e425 100644 --- a/operator/validator/event_handler.go +++ b/operator/validator/event_handler.go @@ -93,6 +93,7 @@ func (c *controller) handleOperatorRemovalEvent( // } //} + // TODO: check by operator ID, not operator public key isOperatorEvent := strings.EqualFold(od.PublicKey, c.operatorPubKey) logFields := make([]zap.Field, 0) if isOperatorEvent || c.validatorOptions.FullNode { @@ -109,7 +110,7 @@ func (c *controller) handleOperatorRemovalEvent( return logFields, nil } - shares, err := c.collection.GetOperatorValidatorShares(od.PublicKey, false) + shares, err := c.collection.GetOperatorIDValidatorShares(event.OperatorId, false) if err != nil { return nil, errors.Wrap(err, "could not get all operator validator shares") } diff --git a/operator/validator/storage.go b/operator/validator/storage.go index fd530e7cbc..56f832cbd9 100644 --- a/operator/validator/storage.go +++ b/operator/validator/storage.go @@ -22,6 +22,7 @@ type ICollection interface { GetValidatorShare(key []byte) (*beaconprotocol.Share, bool, error) GetAllValidatorShares() ([]*beaconprotocol.Share, error) GetOperatorValidatorShares(operatorPubKey string, enabled bool) ([]*beaconprotocol.Share, error) + GetOperatorIDValidatorShares(operatorID uint32, enabled bool) ([]*beaconprotocol.Share, error) GetValidatorSharesByOwnerAddress(ownerAddress string) ([]*beaconprotocol.Share, error) DeleteValidatorShare(key []byte) error } @@ -147,6 +148,30 @@ func (s *Collection) GetOperatorValidatorShares(operatorPubKey string, enabled b return res, err } +// GetOperatorIDValidatorShares returns all not liquidated validator shares belongs to operator ID. +// TODO: check regards returning a slice of public keys instead of share objects +func (s *Collection) GetOperatorIDValidatorShares(operatorID uint32, enabled bool) ([]*beaconprotocol.Share, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + var res []*beaconprotocol.Share + + err := s.db.GetAll(collectionPrefix(), func(i int, obj basedb.Obj) error { + val, err := (&beaconprotocol.Share{}).Deserialize(obj.Key, obj.Value) + if err != nil { + return errors.Wrap(err, "failed to deserialize validator") + } + if !val.Liquidated || !enabled { + if ok := val.IsOperatorIDShare(uint64(operatorID)); ok { + res = append(res, val) + } + } + return nil + }) + + return res, err +} + // GetValidatorSharesByOwnerAddress returns all validator shares belongs to owner address func (s *Collection) GetValidatorSharesByOwnerAddress(ownerAddress string) ([]*beaconprotocol.Share, error) { s.lock.RLock() diff --git a/protocol/v1/blockchain/beacon/share.go b/protocol/v1/blockchain/beacon/share.go index 79d397855c..4f3a5150ef 100644 --- a/protocol/v1/blockchain/beacon/share.go +++ b/protocol/v1/blockchain/beacon/share.go @@ -5,13 +5,14 @@ import ( "crypto/sha256" "encoding/gob" "fmt" - "github.com/bloxapp/ssv/protocol/v1/types" "math" specqbft "github.com/bloxapp/ssv-spec/qbft" spectypes "github.com/bloxapp/ssv-spec/types" "github.com/herumi/bls-eth-go-binary/bls" "github.com/pkg/errors" + + "github.com/bloxapp/ssv/protocol/v1/types" ) // PubKeys defines the type for public keys object representation @@ -57,6 +58,7 @@ type serializedShare struct { } // IsOperatorShare checks whether the share belongs to operator +// TODO: probably we need to use IsOperatorIDShare instead of IsOperatorShare everywhere func (s *Share) IsOperatorShare(operatorPubKey string) bool { for _, pk := range s.Operators { if string(pk) == operatorPubKey { @@ -66,6 +68,16 @@ func (s *Share) IsOperatorShare(operatorPubKey string) bool { return false } +// IsOperatorIDShare checks whether the share belongs to operator ID +func (s *Share) IsOperatorIDShare(operatorID uint64) bool { + for _, id := range s.OperatorIds { + if id == operatorID { + return true + } + } + return false +} + // CommitteeSize returns the IBFT committee size func (s *Share) CommitteeSize() int { return len(s.Committee) diff --git a/protocol/v1/blockchain/beacon/share_test.go b/protocol/v1/blockchain/beacon/share_test.go index 2ea149bb6d..f8ca0f1448 100644 --- a/protocol/v1/blockchain/beacon/share_test.go +++ b/protocol/v1/blockchain/beacon/share_test.go @@ -75,3 +75,24 @@ func TestShare_HashOperators(t *testing.T) { hashes := share.HashOperators() require.Len(t, hashes, 4) } + +func TestShare_IsOperatorIDShare(t *testing.T) { + share := &Share{ + OperatorIds: []uint64{1, 2, 3, 4}, + } + + require.True(t, share.IsOperatorIDShare(1)) + require.False(t, share.IsOperatorIDShare(10)) +} + +func TestShare_IsOperatorShare(t *testing.T) { + share := &Share{ + Operators: [][]byte{ + {1, 1, 1, 1}, + {2, 2, 2, 2}, + }, + } + + require.True(t, share.IsOperatorShare(string([]byte{1, 1, 1, 1}))) + require.False(t, share.IsOperatorShare(string([]byte{1, 2, 3, 4}))) +} diff --git a/protocol/v1/validator/storage.go b/protocol/v1/validator/storage.go index de70a5993b..6fb0e291cb 100644 --- a/protocol/v1/validator/storage.go +++ b/protocol/v1/validator/storage.go @@ -13,6 +13,7 @@ type ICollection interface { GetValidatorShare(key []byte) (*beacon.Share, bool, error) GetAllValidatorShares() ([]*beacon.Share, error) GetOperatorValidatorShares(operatorPubKey string, enabled bool) ([]*beacon.Share, error) + GetOperatorIDValidatorShares(operatorID uint32, enabled bool) ([]*beacon.Share, error) GetValidatorSharesByOwnerAddress(ownerAddress string) ([]*beacon.Share, error) DeleteValidatorShare(key []byte) error } From d1a28048f1e7dadb14e70067bfa032c8afa5b961 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Fri, 14 Oct 2022 10:46:42 +0300 Subject: [PATCH 2/8] migration 7: clean operator removals corruptions --- ...tion_7_clean_operator_removals_corruptions.go | 16 ++++++++++++++++ migrations/migrations.go | 1 + 2 files changed, 17 insertions(+) create mode 100644 migrations/migration_7_clean_operator_removals_corruptions.go diff --git a/migrations/migration_7_clean_operator_removals_corruptions.go b/migrations/migration_7_clean_operator_removals_corruptions.go new file mode 100644 index 0000000000..62175de432 --- /dev/null +++ b/migrations/migration_7_clean_operator_removals_corruptions.go @@ -0,0 +1,16 @@ +package migrations + +import ( + "context" +) + +var migrationCleanOperatorRemovalCorruptions = Migration{ + Name: "migration_7_clean_operator_removal_corruptions", + Run: func(ctx context.Context, opt Options, key []byte) error { + nodeStorage := opt.nodeStorage() + if err := nodeStorage.CleanRegistryData(); err != nil { + return err + } + return opt.Db.Set(migrationsPrefix, key, migrationCompleted) + }, +} diff --git a/migrations/migrations.go b/migrations/migrations.go index c3cec2ad97..5d29ee32cb 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -25,6 +25,7 @@ var ( migrationCleanExporterRegistryData, migrationCleanValidatorRegistryData, migrationCleanSyncOffset, + migrationCleanOperatorRemovalCorruptions, } ) From 817c39bdeffd8e59cead1137ab91349d7de6ed82 Mon Sep 17 00:00:00 2001 From: github-actions Date: Fri, 14 Oct 2022 21:05:00 +0000 Subject: [PATCH 3/8] update code coverage badge --- docs/resources/cov-badge.svg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/resources/cov-badge.svg b/docs/resources/cov-badge.svg index 0cd32894be..283ceb2650 100644 --- a/docs/resources/cov-badge.svg +++ b/docs/resources/cov-badge.svg @@ -1 +1 @@ -coverage: 48.1%coverage48.1% \ No newline at end of file +coverage: 48.7%coverage48.7% \ No newline at end of file From 8d0cfb1ce8cff02d01fa34a700c4e0bf1e7e2d23 Mon Sep 17 00:00:00 2001 From: Amir Y <83904651+amir-blox@users.noreply.github.com> Date: Tue, 18 Oct 2022 09:55:34 +0300 Subject: [PATCH 4/8] Fix nil pointer (#724) avoid creating multiple references of current instance --- protocol/v1/qbft/controller/msgqueue_consumer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/protocol/v1/qbft/controller/msgqueue_consumer.go b/protocol/v1/qbft/controller/msgqueue_consumer.go index 154ef3def1..d3f1760af3 100644 --- a/protocol/v1/qbft/controller/msgqueue_consumer.go +++ b/protocol/v1/qbft/controller/msgqueue_consumer.go @@ -133,12 +133,11 @@ func (c *Controller) processNoRunningInstance( // processByState if an instance is running -> get the state and get the relevant messages func (c *Controller) processByState(handler MessageHandler, identifier string) bool { currentInstance := c.GetCurrentInstance() - if c.GetCurrentInstance() == nil { + if currentInstance == nil { return false } var msg *spectypes.SSVMessage - currentState := currentInstance.GetState() msg = c.getNextMsgForState(currentState, identifier) if msg == nil { From 6419e14dfc2793217db25aad71efcd22d47d3da5 Mon Sep 17 00:00:00 2001 From: github-actions Date: Tue, 18 Oct 2022 07:05:58 +0000 Subject: [PATCH 5/8] update code coverage badge --- docs/resources/cov-badge.svg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/resources/cov-badge.svg b/docs/resources/cov-badge.svg index 283ceb2650..4cc83cfe00 100644 --- a/docs/resources/cov-badge.svg +++ b/docs/resources/cov-badge.svg @@ -1 +1 @@ -coverage: 48.7%coverage48.7% \ No newline at end of file +coverage: 48.2%coverage48.2% \ No newline at end of file From 8553442c78609822329345b6867afe3edad30e78 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 19 Oct 2022 12:22:51 +0300 Subject: [PATCH 6/8] Fix qbft msgqueue concurrent map iteration and map write panic (#723) * Fix qbft msgqueue concurrent map iteration and map write panic * avoid indices race * Fix iterating through indices Co-authored-by: Amir Yahalom --- protocol/v1/qbft/msgqueue/queue.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/protocol/v1/qbft/msgqueue/queue.go b/protocol/v1/qbft/msgqueue/queue.go index 3ab5e5691b..f375829b26 100644 --- a/protocol/v1/qbft/msgqueue/queue.go +++ b/protocol/v1/qbft/msgqueue/queue.go @@ -2,13 +2,13 @@ package msgqueue import ( "fmt" - spectypes "github.com/bloxapp/ssv-spec/types" "strconv" "sync" "sync/atomic" spec "github.com/attestantio/go-eth2-client/spec/phase0" specqbft "github.com/bloxapp/ssv-spec/qbft" + spectypes "github.com/bloxapp/ssv-spec/types" "github.com/pkg/errors" "go.uber.org/zap" @@ -187,14 +187,24 @@ func (q *queue) Peek(n int, idx Index) []*spectypes.SSVMessage { // WithIterator looping through all indexes and return true when relevant and pop func (q *queue) WithIterator(n int, peek bool, iterator func(index Index) bool) []*spectypes.SSVMessage { + indices := make([]Index, 0, len(q.items)) + + q.itemsLock.RLock() for k := range q.items { - if iterator(k) { - if peek { - return q.Peek(n, k) - } - return q.Pop(n, k) + indices = append(indices, k) + } + q.itemsLock.RUnlock() + + for _, idx := range indices { + if !iterator(idx) { + continue } + if peek { + return q.Peek(n, idx) + } + return q.Pop(n, idx) } + return nil } From 49c531060e965c1aca5a3475a77022b33590e58b Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 19 Oct 2022 09:33:13 +0000 Subject: [PATCH 7/8] update code coverage badge --- docs/resources/cov-badge.svg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/resources/cov-badge.svg b/docs/resources/cov-badge.svg index 4cc83cfe00..0cd32894be 100644 --- a/docs/resources/cov-badge.svg +++ b/docs/resources/cov-badge.svg @@ -1 +1 @@ -coverage: 48.2%coverage48.2% \ No newline at end of file +coverage: 48.1%coverage48.1% \ No newline at end of file From 91ba416b4174fb9b2cb3497a573352f22dfad7c5 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Wed, 19 Oct 2022 18:38:45 +0300 Subject: [PATCH 8/8] add log when looking for min peers --- protocol/v1/p2p/min_peers.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/protocol/v1/p2p/min_peers.go b/protocol/v1/p2p/min_peers.go index 76849ec8f3..20d2d6969a 100644 --- a/protocol/v1/p2p/min_peers.go +++ b/protocol/v1/p2p/min_peers.go @@ -19,9 +19,11 @@ func WaitForMinPeers(pctx context.Context, logger *zap.Logger, subscriber Subscr logger.Warn("could not get peers of topic", zap.Error(err)) continue } - if len(peers) >= minPeers { + n := len(peers) + if n >= minPeers { return nil } + logger.Debug("looking for min peers", zap.Int("expected", minPeers), zap.Int("actual", n)) } return ctx.Err()