From d471e5e00e2aa7126783dd903e9eae2b87347ea7 Mon Sep 17 00:00:00 2001 From: Anatolie Lupacescu Date: Wed, 9 Oct 2024 17:42:52 +0300 Subject: [PATCH] syncmap --- registry/storage/shares.go | 94 +++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 53 deletions(-) diff --git a/registry/storage/shares.go b/registry/storage/shares.go index d075e1fa27..cac12d02fd 100644 --- a/registry/storage/shares.go +++ b/registry/storage/shares.go @@ -10,7 +10,6 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" spectypes "github.com/ssvlabs/ssv-spec/types" "go.uber.org/zap" - "golang.org/x/exp/maps" genesistypes "github.com/ssvlabs/ssv/protocol/genesis/types" beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon" @@ -55,9 +54,8 @@ type sharesStorage struct { logger *zap.Logger db basedb.Database prefix []byte - shares map[string]*types.SSVShare + shares *sync.Map validatorStore *validatorStore - mu sync.RWMutex dbmu sync.Mutex } @@ -113,7 +111,7 @@ func (s *storageShare) Decode(data []byte) error { func NewSharesStorage(logger *zap.Logger, db basedb.Database, prefix []byte) (Shares, ValidatorStore, error) { storage := &sharesStorage{ logger: logger, - shares: make(map[string]*types.SSVShare), + shares: new(sync.Map), // make(map[string]*types.SSVShare), db: db, prefix: prefix, } @@ -125,7 +123,8 @@ func NewSharesStorage(logger *zap.Logger, db basedb.Database, prefix []byte) (Sh func() []*types.SSVShare { return storage.List(nil) }, func(pk []byte) (*types.SSVShare, bool) { return storage.Get(nil, pk) }, ) - if err := storage.validatorStore.handleSharesAdded(maps.Values(storage.shares)...); err != nil { + + if err := storage.validatorStore.handleSharesAdded(mapValues(storage.shares)...); err != nil { return nil, nil, err } return storage, storage.validatorStore, nil @@ -133,9 +132,6 @@ func NewSharesStorage(logger *zap.Logger, db basedb.Database, prefix []byte) (Sh // load reads all shares from db. func (s *sharesStorage) load() error { - s.mu.Lock() - defer s.mu.Unlock() - s.dbmu.Lock() defer s.dbmu.Unlock() @@ -150,53 +146,54 @@ func (s *sharesStorage) load() error { return fmt.Errorf("failed to convert storage share to spec share: %w", err) } - s.shares[hex.EncodeToString(val.ValidatorPubKey[:])] = share + key := hex.EncodeToString(val.ValidatorPubKey[:]) + s.shares.Store(key, share) return nil }) } func (s *sharesStorage) Get(_ basedb.Reader, pubKey []byte) (*types.SSVShare, bool) { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.unsafeGet(pubKey) + key := hex.EncodeToString(pubKey) + share, found := s.shares.Load(key) + if !found { + return nil, false + } + return share.(*types.SSVShare), true } -func (s *sharesStorage) unsafeGet(pubKey []byte) (*types.SSVShare, bool) { - share, found := s.shares[hex.EncodeToString(pubKey)] - return share, found -} +func mapValues(m *sync.Map) (shares []*types.SSVShare) { + m.Range(func(_, value any) bool { + shares = append(shares, value.(*types.SSVShare)) + return true + }) -func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) []*types.SSVShare { - s.mu.RLock() - defer s.mu.RUnlock() + return +} +func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) (shares []*types.SSVShare) { if len(filters) == 0 { - return maps.Values(s.shares) + return mapValues(s.shares) } - var shares []*types.SSVShare -Shares: - for _, share := range s.shares { + s.shares.Range(func(key, value any) bool { + share := value.(*types.SSVShare) for _, filter := range filters { if !filter(share) { - continue Shares + return true } } shares = append(shares, share) - } + return true + }) + return shares } func (s *sharesStorage) Range(_ basedb.Reader, fn func(*types.SSVShare) bool) { - s.mu.RLock() - defer s.mu.RUnlock() - - for _, share := range s.shares { - if !fn(share) { - break - } - } + s.shares.Range(func(key, value any) bool { + share := value.(*types.SSVShare) + return fn(share) + }) } func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) error { @@ -227,9 +224,6 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er return err } - s.mu.Lock() - defer s.mu.Unlock() - updateShares := make([]*types.SSVShare, 0, len(shares)) addShares := make([]*types.SSVShare, 0, len(shares)) @@ -237,12 +231,12 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er key := hex.EncodeToString(share.ValidatorPubKey[:]) // Update validatorStore indices. - if _, ok := s.shares[key]; ok { + if _, loaded := s.shares.Swap(key, share); loaded { updateShares = append(updateShares, share) } else { addShares = append(addShares, share) + s.shares.Store(key, share) } - s.shares[key] = share } if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil { @@ -318,9 +312,6 @@ func (s *sharesStorage) storageShareToSpecShare(share *storageShare) (*types.SSV } func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error { - s.mu.Lock() - defer s.mu.Unlock() - s.dbmu.Lock() defer s.dbmu.Unlock() @@ -329,13 +320,16 @@ func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error { return err } - share := s.shares[hex.EncodeToString(pubKey)] - if share == nil { + key := hex.EncodeToString(pubKey) + val, found := s.shares.Load(key) + if !found { return nil } + share := val.(*types.SSVShare) + // Remove the share from local storage map - delete(s.shares, hex.EncodeToString(pubKey)) + s.shares.Delete(key) // Remove the share from the validator store. This method will handle its own locking. if err := s.validatorStore.handleShareRemoved(share); err != nil { @@ -350,15 +344,12 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK] var shares []*types.SSVShare func() { - s.mu.RLock() - defer s.mu.RUnlock() - for pk, metadata := range data { if metadata == nil { continue } - share, exists := s.unsafeGet(pk[:]) + share, exists := s.Get(nil, pk[:]) if !exists { continue } @@ -374,9 +365,6 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK] // Drop deletes all shares. func (s *sharesStorage) Drop() error { - s.mu.Lock() - defer s.mu.Unlock() - s.dbmu.Lock() defer s.dbmu.Unlock() @@ -388,7 +376,7 @@ func (s *sharesStorage) Drop() error { return err } - s.shares = make(map[string]*types.SSVShare) + s.shares = new(sync.Map) // make(map[string]*types.SSVShare) s.validatorStore.handleDrop() return nil }