From cc6f7d1402b9be88df4431a597e729e882c628b1 Mon Sep 17 00:00:00 2001 From: moshe-blox <89339422+moshe-blox@users.noreply.github.com> Date: Tue, 2 Jul 2024 16:26:44 +0300 Subject: [PATCH] fix: refetch duties after metdata update (`main`) (#1429) * fix: refetch duties after metdata update (`main`) * new metadata refresh mechanism --- go.mod | 2 +- go.sum | 4 +- operator/duties/attester.go | 5 +- operator/duties/proposer.go | 5 +- operator/duties/sync_committee.go | 5 +- operator/duties/validatorregistration.go | 4 +- operator/duties/voluntary_exit.go | 4 +- operator/validator/controller.go | 222 +++++++++++++---------- operator/validator/controller_test.go | 145 ++++++++++----- operator/validator/mocks/controller.go | 12 ++ protocol/v2/types/ssvshare.go | 12 ++ registry/storage/shares.go | 21 +++ 12 files changed, 291 insertions(+), 150 deletions(-) diff --git a/go.mod b/go.mod index 1a6e824cc6..0b7e4bccc8 100644 --- a/go.mod +++ b/go.mod @@ -228,4 +228,4 @@ replace github.com/google/flatbuffers => github.com/google/flatbuffers v1.11.0 replace github.com/dgraph-io/ristretto => github.com/dgraph-io/ristretto v0.1.1-0.20211108053508-297c39e6640f -replace github.com/attestantio/go-eth2-client => github.com/ssvlabs/go-eth2-client v0.0.0-20240623135036-ab92a705c625 +replace github.com/attestantio/go-eth2-client => github.com/ssvlabs/go-eth2-client v0.0.0-20240702122821-2c345f4fc90f diff --git a/go.sum b/go.sum index 6b5ab08e63..9f22be2d8c 100644 --- a/go.sum +++ b/go.sum @@ -754,8 +754,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.0.0/go.mod h1:A8kyI5cUJhb8N+3pkfONlcEcZbueH6nhAm0Fq7SrnBM= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= -github.com/ssvlabs/go-eth2-client v0.0.0-20240623135036-ab92a705c625 h1:JX8uEqeUtup8VmdCBWk7V/uZVRe3QY3/rVcCFu/5s6Y= -github.com/ssvlabs/go-eth2-client v0.0.0-20240623135036-ab92a705c625/go.mod h1:TTz7YF6w4z6ahvxKiHuGPn6DbQn7gH6HPuWm/DEQeGE= +github.com/ssvlabs/go-eth2-client v0.0.0-20240702122821-2c345f4fc90f h1:ZaokUqygFE81zqNd8mQmdrX2B8AkhYmt0i6gs9RDXi0= +github.com/ssvlabs/go-eth2-client v0.0.0-20240702122821-2c345f4fc90f/go.mod h1:TTz7YF6w4z6ahvxKiHuGPn6DbQn7gH6HPuWm/DEQeGE= github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobtDnDzA= github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9+mHxBEeo3Hbg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/operator/duties/attester.go b/operator/duties/attester.go index f89cbaf867..e311e1b531 100644 --- a/operator/duties/attester.go +++ b/operator/duties/attester.go @@ -65,13 +65,15 @@ func (h *AttesterHandler) HandleDuties(ctx context.Context) { h.fetchNextEpoch = true + next := h.ticker.Next() for { select { case <-ctx.Done(): return - case <-h.ticker.Next(): + case <-next: slot := h.ticker.Slot() + next = h.ticker.Next() currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(slot) buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1) h.logger.Debug("🛠 ticker event", zap.String("epoch_slot_seq", buildStr)) @@ -188,6 +190,7 @@ func (h *AttesterHandler) fetchAndProcessDuties(ctx context.Context, epoch phase indices := h.validatorController.CommitteeActiveIndices(epoch) if len(indices) == 0 { + h.logger.Debug("no active validators for epoch", fields.Epoch(epoch)) return nil } diff --git a/operator/duties/proposer.go b/operator/duties/proposer.go index 37c4c1b4bd..6f81f434c9 100644 --- a/operator/duties/proposer.go +++ b/operator/duties/proposer.go @@ -54,13 +54,15 @@ func (h *ProposerHandler) Name() string { func (h *ProposerHandler) HandleDuties(ctx context.Context) { h.logger.Info("starting duty handler") + next := h.ticker.Next() for { select { case <-ctx.Done(): return - case <-h.ticker.Next(): + case <-next: slot := h.ticker.Slot() + next = h.ticker.Next() currentEpoch := h.network.Beacon.EstimatedEpochAtSlot(slot) buildStr := fmt.Sprintf("e%v-s%v-#%v", currentEpoch, slot, slot%32+1) h.logger.Debug("🛠 ticker event", zap.String("epoch_slot_seq", buildStr)) @@ -148,6 +150,7 @@ func (h *ProposerHandler) fetchAndProcessDuties(ctx context.Context, epoch phase allIndices := h.validatorController.AllActiveIndices(epoch, false) if len(allIndices) == 0 { + h.logger.Debug("no active validators for epoch", fields.Epoch(epoch)) return nil } diff --git a/operator/duties/sync_committee.go b/operator/duties/sync_committee.go index 2b4b7c7d3c..38f50b9037 100644 --- a/operator/duties/sync_committee.go +++ b/operator/duties/sync_committee.go @@ -67,13 +67,15 @@ func (h *SyncCommitteeHandler) HandleDuties(ctx context.Context) { h.fetchNextPeriod = true } + next := h.ticker.Next() for { select { case <-ctx.Done(): return - case <-h.ticker.Next(): + case <-next: slot := h.ticker.Slot() + next = h.ticker.Next() epoch := h.network.Beacon.EstimatedEpochAtSlot(slot) period := h.network.Beacon.EstimatedSyncCommitteePeriodAtEpoch(epoch) buildStr := fmt.Sprintf("p%v-%v-s%v-#%v", period, epoch, slot, slot%32+1) @@ -196,6 +198,7 @@ func (h *SyncCommitteeHandler) fetchAndProcessDuties(ctx context.Context, period allActiveIndices := h.validatorController.AllActiveIndices(firstEpoch, waitForInitial) if len(allActiveIndices) == 0 { + h.logger.Debug("no active validators for period", fields.Epoch(currentEpoch), zap.Uint64("period", period)) return nil } diff --git a/operator/duties/validatorregistration.go b/operator/duties/validatorregistration.go index 4d4b36ceef..add49298ef 100644 --- a/operator/duties/validatorregistration.go +++ b/operator/duties/validatorregistration.go @@ -28,13 +28,15 @@ func (h *ValidatorRegistrationHandler) HandleDuties(ctx context.Context) { // should be registered within validatorRegistrationEpochInterval epochs time in a corresponding slot registrationSlotInterval := h.network.SlotsPerEpoch() * validatorRegistrationEpochInterval + next := h.ticker.Next() for { select { case <-ctx.Done(): return - case <-h.ticker.Next(): + case <-next: slot := h.ticker.Slot() + next = h.ticker.Next() epoch := h.network.Beacon.EstimatedEpochAtSlot(slot) shares := h.validatorController.GetOperatorShares() diff --git a/operator/duties/voluntary_exit.go b/operator/duties/voluntary_exit.go index 331bd3a539..298c68d794 100644 --- a/operator/duties/voluntary_exit.go +++ b/operator/duties/voluntary_exit.go @@ -41,13 +41,15 @@ func (h *VoluntaryExitHandler) Name() string { func (h *VoluntaryExitHandler) HandleDuties(ctx context.Context) { h.logger.Info("starting duty handler") + next := h.ticker.Next() for { select { case <-ctx.Done(): return - case <-h.ticker.Next(): + case <-next: currentSlot := h.ticker.Slot() + next = h.ticker.Next() h.logger.Debug("🛠 ticker event", fields.Slot(currentSlot)) diff --git a/operator/validator/controller.go b/operator/validator/controller.go index ed5f469707..c83ce3df8b 100644 --- a/operator/validator/controller.go +++ b/operator/validator/controller.go @@ -127,6 +127,7 @@ type Recipients interface { type SharesStorage interface { Get(txn basedb.Reader, pubKey []byte) *types.SSVShare List(txn basedb.Reader, filters ...registrystorage.SharesFilter) []*types.SSVShare + Range(txn basedb.Reader, fn func(*types.SSVShare) bool) UpdateValidatorMetadata(pk string, metadata *beaconprotocol.ValidatorMetadata) error } @@ -396,56 +397,63 @@ func (c *controller) handleWorkerMessages(msg *queue.DecodedSSVMessage) error { // StartValidators loads all persisted shares and setup the corresponding validators func (c *controller) StartValidators() { - if c.validatorOptions.Exporter { - // There are no committee validators to setup. - close(c.committeeValidatorSetup) - - // Setup non-committee validators. - c.setupNonCommitteeValidators() - return - } - + // Load non-liquidated shares. shares := c.sharesStorage.List(nil, registrystorage.ByNotLiquidated()) if len(shares) == 0 { + close(c.committeeValidatorSetup) c.logger.Info("could not find validators") return } - var ownShares []*ssvtypes.SSVShare var allPubKeys = make([][]byte, 0, len(shares)) for _, share := range shares { - if share.BelongsToOperator(c.operatorDataStore.GetOperatorID()) { + if c.operatorDataStore.GetOperatorID() != 0 && + share.BelongsToOperator(c.operatorDataStore.GetOperatorID()) { ownShares = append(ownShares, share) } allPubKeys = append(allPubKeys, share.ValidatorPubKey) } - // Setup committee validators. - inited := c.setupValidators(ownShares) - if len(inited) == 0 { - // If no validators were started and therefore we're not subscribed to any subnets, - // then subscribe to a random subnet to participate in the network. - if err := c.network.SubscribeRandoms(c.logger, 1); err != nil { - c.logger.Error("failed to subscribe to random subnets", zap.Error(err)) + if c.validatorOptions.Exporter { + // There are no committee validators to setup. + close(c.committeeValidatorSetup) + } else { + // Setup committee validators. + inited := c.setupValidators(ownShares) + if len(inited) == 0 { + // If no validators were started and therefore we're not subscribed to any subnets, + // then subscribe to a random subnet to participate in the network. + if err := c.network.SubscribeRandoms(c.logger, 1); err != nil { + c.logger.Error("failed to subscribe to random subnets", zap.Error(err)) + } } - } - close(c.committeeValidatorSetup) + close(c.committeeValidatorSetup) - // Start validators. - c.startValidators(inited) + // Start validators. + c.startValidators(inited) + } - // Fetch metadata for all validators. - start := time.Now() - err := beaconprotocol.UpdateValidatorsMetadata(c.logger, allPubKeys, c, c.beacon, c.onMetadataUpdated) - if err != nil { - c.logger.Error("failed to update validators metadata after setup", - zap.Int("shares", len(allPubKeys)), - fields.Took(time.Since(start)), - zap.Error(err)) - } else { - c.logger.Debug("updated validators metadata after setup", - zap.Int("shares", len(allPubKeys)), - fields.Took(time.Since(start))) + // Fetch metadata now if there is none. Otherwise, UpdateValidatorsMetadataLoop will handle it. + var hasMetadata bool + for _, share := range shares { + if !share.Liquidated && share.HasBeaconMetadata() { + hasMetadata = true + break + } + } + if !hasMetadata { + start := time.Now() + err := c.updateValidatorsMetadata(c.logger, allPubKeys, c, c.beacon, c.onMetadataUpdated) + if err != nil { + c.logger.Error("failed to update validators metadata after setup", + zap.Int("shares", len(allPubKeys)), + fields.Took(time.Since(start)), + zap.Error(err)) + } else { + c.logger.Debug("updated validators metadata after setup", + zap.Int("shares", len(allPubKeys)), + fields.Took(time.Since(start))) + } } } @@ -501,28 +509,6 @@ func (c *controller) startValidators(validators []*validator.Validator) int { return started } -// setupNonCommitteeValidators trigger SyncHighestDecided for each validator -// to start consensus flow which would save the highest decided instance -// and sync any gaps (in protocol/v2/qbft/controller/decided.go). -func (c *controller) setupNonCommitteeValidators() { - nonCommitteeShares := c.sharesStorage.List(nil, registrystorage.ByNotLiquidated()) - if len(nonCommitteeShares) == 0 { - c.logger.Info("could not find non-committee validators") - return - } - - pubKeys := make([][]byte, 0, len(nonCommitteeShares)) - for _, validatorShare := range nonCommitteeShares { - pubKeys = append(pubKeys, validatorShare.ValidatorPubKey) - } - if len(pubKeys) > 0 { - c.logger.Debug("updating metadata for non-committee validators", zap.Int("count", len(pubKeys))) - if err := beaconprotocol.UpdateValidatorsMetadata(c.logger, pubKeys, c, c.beacon, c.onMetadataUpdated); err != nil { - c.logger.Warn("could not update all validators", zap.Error(err)) - } - } -} - // StartNetworkHandlers init msg worker that handles network messages func (c *controller) StartNetworkHandlers() { // first, set stream handlers @@ -538,8 +524,6 @@ func (c *controller) StartNetworkHandlers() { // UpdateValidatorMetadata updates a given validator with metadata (implements ValidatorMetadataStorage) func (c *controller) UpdateValidatorMetadata(pk string, metadata *beaconprotocol.ValidatorMetadata) error { - c.metadataLastUpdated[pk] = time.Now() - if metadata == nil { return errors.New("could not update empty metadata") } @@ -653,11 +637,13 @@ func (c *controller) AllActiveIndices(epoch phase0.Epoch, afterInit bool) []phas if afterInit { <-c.committeeValidatorSetup } - shares := c.sharesStorage.List(nil, registrystorage.ByAttesting(epoch)) - indices := make([]phase0.ValidatorIndex, len(shares)) - for i, share := range shares { - indices[i] = share.BeaconMetadata.Index - } + var indices []phase0.ValidatorIndex + c.sharesStorage.Range(nil, func(share *ssvtypes.SSVShare) bool { + if share.IsAttesting(epoch) { + indices = append(indices, share.BeaconMetadata.Index) + } + return true + }) return indices } @@ -803,37 +789,45 @@ func (c *controller) startValidator(v *validator.Validator) (bool, error) { // UpdateValidatorMetaDataLoop updates metadata of validators in an interval func (c *controller) UpdateValidatorMetaDataLoop() { - var interval = c.beacon.GetBeaconNetwork().SlotDurationSec() * 2 - - // Prepare share filters. - filters := []registrystorage.SharesFilter{} - - // Filter for validators who are not liquidated. - filters = append(filters, registrystorage.ByNotLiquidated()) - - // Filter for validators which haven't been updated recently. - filters = append(filters, func(s *ssvtypes.SSVShare) bool { - last, ok := c.metadataLastUpdated[string(s.ValidatorPubKey)] - return !ok || time.Since(last) > c.metadataUpdateInterval - }) + const batchSize = 512 + var sleep = 2 * time.Second for { - time.Sleep(interval) - start := time.Now() - // Get the shares to fetch metadata for. - shares := c.sharesStorage.List(nil, filters...) - var pks [][]byte + start := time.Now() + var existingShares, newShares []*ssvtypes.SSVShare + c.sharesStorage.Range(nil, func(share *ssvtypes.SSVShare) bool { + if share.Liquidated { + return true + } + if share.BeaconMetadata == nil && share.MetadataLastUpdated().IsZero() { + newShares = append(newShares, share) + } else if time.Since(share.MetadataLastUpdated()) > c.metadataUpdateInterval { + existingShares = append(existingShares, share) + } + return len(newShares) < batchSize + }) + + // Combine validators up to batchSize, prioritizing the new ones. + shares := newShares + if remainder := batchSize - len(shares); remainder > 0 { + end := remainder + if end > len(existingShares) { + end = len(existingShares) + } + shares = append(shares, existingShares[:end]...) + } for _, share := range shares { - pks = append(pks, share.ValidatorPubKey) - c.metadataLastUpdated[string(share.ValidatorPubKey)] = time.Now() + share.SetMetadataLastUpdated(time.Now()) } + filteringTook := time.Since(start) - // TODO: continue if there is nothing to update. - - c.recentlyStartedValidators = 0 - if len(pks) > 0 { - err := beaconprotocol.UpdateValidatorsMetadata(c.logger, pks, c, c.beacon, c.onMetadataUpdated) + if len(shares) > 0 { + pubKeys := make([][]byte, len(shares)) + for i, s := range shares { + pubKeys[i] = s.ValidatorPubKey + } + err := c.updateValidatorsMetadata(c.logger, pubKeys, c, c.beacon, c.onMetadataUpdated) if err != nil { c.logger.Warn("failed to update validators metadata", zap.Error(err)) continue @@ -841,18 +835,56 @@ func (c *controller) UpdateValidatorMetaDataLoop() { } c.logger.Debug("updated validators metadata", zap.Int("validators", len(shares)), + zap.Int("new_validators", len(newShares)), zap.Uint64("started_validators", c.recentlyStartedValidators), + zap.Duration("filtering_took", filteringTook), fields.Took(time.Since(start))) - // Notify DutyScheduler of new validators. - if c.recentlyStartedValidators > 0 { - select { - case c.indicesChange <- struct{}{}: - case <-time.After(interval): - c.logger.Warn("timed out while notifying DutyScheduler of new validators") - } + // Only sleep if there aren't more validators to fetch metadata for. + if len(shares) < batchSize { + time.Sleep(sleep) + } + } +} + +func (c *controller) updateValidatorsMetadata(logger *zap.Logger, pks [][]byte, storage beaconprotocol.ValidatorMetadataStorage, beacon beaconprotocol.BeaconNode, onMetadataUpdated func(pk string, meta *beaconprotocol.ValidatorMetadata)) error { + // Fetch metadata for all validators. + c.recentlyStartedValidators = 0 + beforeUpdate := c.AllActiveIndices(c.beacon.GetBeaconNetwork().EstimatedCurrentEpoch(), false) + + err := beaconprotocol.UpdateValidatorsMetadata(logger, pks, storage, beacon, onMetadataUpdated) + if err != nil { + return errors.Wrap(err, "failed to update validators metadata") + } + + // Refresh duties if there are any new active validators. + afterUpdate := c.AllActiveIndices(c.beacon.GetBeaconNetwork().EstimatedCurrentEpoch(), false) + if c.recentlyStartedValidators > 0 || hasNewValidators(beforeUpdate, afterUpdate) { + c.logger.Debug("new validators found after metadata update", + zap.Int("before", len(beforeUpdate)), + zap.Int("after", len(afterUpdate)), + zap.Uint64("started_validators", c.recentlyStartedValidators), + ) + select { + case c.indicesChange <- struct{}{}: + case <-time.After(2 * c.beacon.GetBeaconNetwork().SlotDurationSec()): + c.logger.Warn("timed out while notifying DutyScheduler of new validators") + } + } + return nil +} + +func hasNewValidators(before []phase0.ValidatorIndex, after []phase0.ValidatorIndex) bool { + m := make(map[phase0.ValidatorIndex]struct{}) + for _, v := range before { + m[v] = struct{}{} + } + for _, v := range after { + if _, ok := m[v]; !ok { + return true } } + return false } // SetupRunners initializes duty runners for the given validator diff --git a/operator/validator/controller_test.go b/operator/validator/controller_test.go index f57bc930f1..e8c33e912f 100644 --- a/operator/validator/controller_test.go +++ b/operator/validator/controller_test.go @@ -87,11 +87,11 @@ func TestNewController(t *testing.T) { require.IsType(t, &controller{}, control) } -func TestSetupNonCommitteeValidators(t *testing.T) { +func TestSetupValidatorsExporter(t *testing.T) { passedEpoch := phase0.Epoch(1) operators := buildOperators(t) - operatorDataStore := operatordatastore.New(buildOperatorData(1, "67Ce5c69260bd819B4e0AD13f4b873074D479811")) + operatorDataStore := operatordatastore.New(buildOperatorData(0, "67Ce5c69260bd819B4e0AD13f4b873074D479811")) recipientData := buildFeeRecipient("67Ce5c69260bd819B4e0AD13f4b873074D479811", "45E668aba4b7fc8761331EC3CE77584B7A99A51A") secretKey := &bls.SecretKey{} @@ -99,18 +99,8 @@ func TestSetupNonCommitteeValidators(t *testing.T) { require.NoError(t, secretKey.SetHexString(sk1Str)) require.NoError(t, secretKey2.SetHexString(sk2Str)) - firstValidator := &validator.Validator{ - DutyRunners: runner.DutyRunners{}, - Storage: ibftstorage.NewStores(), - Share: &types.SSVShare{ - Share: spectypes.Share{ - ValidatorPubKey: secretKey.GetPublicKey().Serialize(), - }, - }, - } - bcResponse := map[phase0.ValidatorIndex]*eth2apiv1.Validator{ - 0: { + 2: { Balance: 0, Status: 3, Index: 2, @@ -119,9 +109,18 @@ func TestSetupNonCommitteeValidators(t *testing.T) { PublicKey: phase0.BLSPubKey(secretKey.GetPublicKey().Serialize()), }, }, + 3: { + Balance: 0, + Status: 3, + Index: 3, + Validator: &phase0.Validator{ + ActivationEpoch: passedEpoch, + PublicKey: phase0.BLSPubKey(secretKey2.GetPublicKey().Serialize()), + }, + }, } - sharesSlice := []*types.SSVShare{ + sharesWithMetadata := []*types.SSVShare{ { Share: spectypes.Share{ OperatorID: 1, @@ -155,35 +154,83 @@ func TestSetupNonCommitteeValidators(t *testing.T) { }, }, } + _ = sharesWithMetadata + + sharesWithoutMetadata := []*types.SSVShare{ + { + Share: spectypes.Share{ + OperatorID: 1, + Committee: operators, + ValidatorPubKey: secretKey.GetPublicKey().Serialize(), + }, + Metadata: types.Metadata{ + Liquidated: false, + }, + }, + { + Share: spectypes.Share{ + OperatorID: 2, + Committee: operators, + ValidatorPubKey: secretKey2.GetPublicKey().Serialize(), + }, + Metadata: types.Metadata{ + Liquidated: false, + }, + }, + } + _ = sharesWithoutMetadata testCases := []struct { name string shareStorageListResponse []*types.SSVShare + expectMetadataFetch bool syncHighestDecidedResponse error getValidatorDataResponse error }{ - {"no shares of non committee", nil, nil, nil}, - {"set up non committee validators", sharesSlice, nil, nil}, - {"fail to sync highest decided", sharesSlice, errors.New("failed to sync highest decided"), nil}, - {"fail to update validators metadata", sharesSlice, nil, errors.New("could not update all validators")}, + {"no shares of non committee", nil, false, nil, nil}, + {"set up non committee validators", sharesWithMetadata, false, nil, nil}, + {"set up non committee validators without metadata", sharesWithoutMetadata, true, nil, nil}, + {"fail to sync highest decided", sharesWithMetadata, false, errors.New("failed to sync highest decided"), nil}, + {"fail to update validators metadata", sharesWithMetadata, false, nil, errors.New("could not update all validators")}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctrl, logger, sharesStorage, network, _, recipientStorage, bc := setupCommonTestComponents(t) defer ctrl.Finish() - testValidatorsMap := map[string]*validator.Validator{ - secretKey.GetPublicKey().SerializeToHexStr(): firstValidator, - } - mockValidatorsMap := validatorsmap.New(context.TODO(), validatorsmap.WithInitialState(testValidatorsMap)) + mockValidatorsMap := validatorsmap.New(context.TODO()) if tc.shareStorageListResponse == nil { sharesStorage.EXPECT().List(gomock.Any(), gomock.Any()).Return(tc.shareStorageListResponse).Times(1) } else { - sharesStorage.EXPECT().Get(gomock.Any(), gomock.Any()).Return(sharesSlice[0]).AnyTimes() - bc.EXPECT().GetValidatorData(gomock.Any()).Return(bcResponse, tc.getValidatorDataResponse).Times(1) + sharesStorage.EXPECT().Get(gomock.Any(), gomock.Any()).DoAndReturn(func(_ basedb.Reader, pubKey []byte) *types.SSVShare { + for _, share := range tc.shareStorageListResponse { + if hex.EncodeToString(share.Share.ValidatorPubKey) == hex.EncodeToString(pubKey) { + return share + } + } + return nil + }).AnyTimes() sharesStorage.EXPECT().List(gomock.Any(), gomock.Any()).Return(tc.shareStorageListResponse).Times(1) - sharesStorage.EXPECT().UpdateValidatorMetadata(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + sharesStorage.EXPECT().Range(gomock.Any(), gomock.Any()).DoAndReturn(func(_ basedb.Reader, fn func(*types.SSVShare) bool) { + for _, share := range tc.shareStorageListResponse { + if !fn(share) { + break + } + } + }).AnyTimes() + if tc.expectMetadataFetch { + bc.EXPECT().GetValidatorData(gomock.Any()).Return(bcResponse, tc.getValidatorDataResponse).Times(1) + sharesStorage.EXPECT().UpdateValidatorMetadata(gomock.Any(), gomock.Any()).DoAndReturn(func(pk string, metadata *beacon.ValidatorMetadata) error { + for _, share := range tc.shareStorageListResponse { + if hex.EncodeToString(share.Share.ValidatorPubKey) == pk { + share.Metadata.BeaconMetadata = metadata + } + } + return nil + }).Times(len(tc.shareStorageListResponse)) + bc.EXPECT().GetBeaconNetwork().Return(networkconfig.Mainnet.Beacon.GetBeaconNetwork()).AnyTimes() + } recipientStorage.EXPECT().GetRecipientData(gomock.Any(), gomock.Any()).Return(recipientData, true, nil).Times(0) } @@ -191,19 +238,21 @@ func TestSetupNonCommitteeValidators(t *testing.T) { return true, nil } controllerOptions := MockControllerOptions{ - beacon: bc, - network: network, - operatorDataStore: operatorDataStore, - sharesStorage: sharesStorage, - recipientsStorage: recipientStorage, - validatorsMap: mockValidatorsMap, - validatorOptions: validator.Options{}, + beacon: bc, + network: network, + operatorDataStore: operatorDataStore, + sharesStorage: sharesStorage, + recipientsStorage: recipientStorage, + validatorsMap: mockValidatorsMap, + validatorOptions: validator.Options{ + Exporter: true, + }, metrics: validator.NopMetrics{}, metadataLastUpdated: map[string]time.Time{}, } ctr := setupController(logger, controllerOptions) ctr.validatorStartFunc = validatorStartFunc - ctr.setupNonCommitteeValidators() + ctr.StartValidators() }) } } @@ -1011,20 +1060,22 @@ func TestGetIndices(t *testing.T) { func setupController(logger *zap.Logger, opts MockControllerOptions) controller { return controller{ - metadataUpdateInterval: 0, - logger: logger, - beacon: opts.beacon, - network: opts.network, - metrics: opts.metrics, - keyManager: opts.keyManager, - ibftStorageMap: opts.StorageMap, - operatorDataStore: opts.operatorDataStore, - sharesStorage: opts.sharesStorage, - validatorsMap: opts.validatorsMap, - context: context.Background(), - validatorOptions: opts.validatorOptions, - recipientsStorage: opts.recipientsStorage, - messageRouter: newMessageRouter(logger), + metadataUpdateInterval: 0, + logger: logger, + beacon: opts.beacon, + network: opts.network, + metrics: opts.metrics, + keyManager: opts.keyManager, + ibftStorageMap: opts.StorageMap, + operatorDataStore: opts.operatorDataStore, + sharesStorage: opts.sharesStorage, + validatorsMap: opts.validatorsMap, + context: context.Background(), + validatorOptions: opts.validatorOptions, + recipientsStorage: opts.recipientsStorage, + messageRouter: newMessageRouter(logger), + committeeValidatorSetup: make(chan struct{}), + indicesChange: make(chan struct{}, 32), messageWorker: worker.NewWorker(logger, &worker.Config{ Ctx: context.Background(), WorkersCount: 1, diff --git a/operator/validator/mocks/controller.go b/operator/validator/mocks/controller.go index 9cdcd0bf0f..497d202dc6 100644 --- a/operator/validator/mocks/controller.go +++ b/operator/validator/mocks/controller.go @@ -375,6 +375,18 @@ func (mr *MockSharesStorageMockRecorder) List(txn interface{}, filters ...interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockSharesStorage)(nil).List), varargs...) } +// Range mocks base method. +func (m *MockSharesStorage) Range(txn basedb.Reader, fn func(*types0.SSVShare) bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Range", txn, fn) +} + +// Range indicates an expected call of Range. +func (mr *MockSharesStorageMockRecorder) Range(txn, fn interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Range", reflect.TypeOf((*MockSharesStorage)(nil).Range), txn, fn) +} + // UpdateValidatorMetadata mocks base method. func (m *MockSharesStorage) UpdateValidatorMetadata(pk string, metadata *beacon.ValidatorMetadata) error { m.ctrl.T.Helper() diff --git a/protocol/v2/types/ssvshare.go b/protocol/v2/types/ssvshare.go index 987b95b651..1c686998cc 100644 --- a/protocol/v2/types/ssvshare.go +++ b/protocol/v2/types/ssvshare.go @@ -6,6 +6,7 @@ import ( "encoding/gob" "fmt" "sort" + "time" eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/bellatrix" @@ -107,4 +108,15 @@ type Metadata struct { BeaconMetadata *beaconprotocol.ValidatorMetadata OwnerAddress common.Address Liquidated bool + + // lastUpdated is an internal field that can be used to track the last time the metadata was updated. + lastUpdated time.Time +} + +func (m *Metadata) MetadataLastUpdated() time.Time { + return m.lastUpdated +} + +func (m *Metadata) SetMetadataLastUpdated(t time.Time) { + m.lastUpdated = t } diff --git a/registry/storage/shares.go b/registry/storage/shares.go index 522ff7540a..41431ce05f 100644 --- a/registry/storage/shares.go +++ b/registry/storage/shares.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "sync" + "time" "github.com/attestantio/go-eth2-client/spec/phase0" spectypes "github.com/bloxapp/ssv-spec/types" @@ -32,6 +33,10 @@ type Shares interface { // List returns a list of shares, filtered by the given filters (if any). List(txn basedb.Reader, filters ...SharesFilter) []*types.SSVShare + // Range calls the given function over each share. + // If the function returns false, the iteration stops. + Range(txn basedb.Reader, fn func(*types.SSVShare) bool) + // Save saves the given shares. Save(txn basedb.ReadWriter, shares ...*types.SSVShare) error @@ -110,6 +115,17 @@ Shares: return shares } +func (s *sharesStorage) Range(_ basedb.Reader, fn func(*types.SSVShare) bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, share := range s.shares { + if !fn(share) { + break + } + } +} + func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) error { if len(shares) == 0 { return nil @@ -150,6 +166,10 @@ func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error { // UpdateValidatorMetadata updates the metadata of the given validator func (s *sharesStorage) UpdateValidatorMetadata(pk string, metadata *beaconprotocol.ValidatorMetadata) error { + if metadata == nil { + return nil + } + key, err := hex.DecodeString(pk) if err != nil { return err @@ -159,6 +179,7 @@ func (s *sharesStorage) UpdateValidatorMetadata(pk string, metadata *beaconproto return nil } + share.SetMetadataLastUpdated(time.Now()) share.BeaconMetadata = metadata return s.Save(nil, share) }