Skip to content

Commit

Permalink
syncmap
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolie-ssv committed Oct 9, 2024
1 parent dd28280 commit d471e5e
Showing 1 changed file with 41 additions and 53 deletions.
94 changes: 41 additions & 53 deletions registry/storage/shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/attestantio/go-eth2-client/spec/phase0"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"
"golang.org/x/exp/maps"

genesistypes "github.com/ssvlabs/ssv/protocol/genesis/types"
beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
Expand Down Expand Up @@ -55,9 +54,8 @@ type sharesStorage struct {
logger *zap.Logger
db basedb.Database
prefix []byte
shares map[string]*types.SSVShare
shares *sync.Map
validatorStore *validatorStore
mu sync.RWMutex
dbmu sync.Mutex
}

Expand Down Expand Up @@ -113,7 +111,7 @@ func (s *storageShare) Decode(data []byte) error {
func NewSharesStorage(logger *zap.Logger, db basedb.Database, prefix []byte) (Shares, ValidatorStore, error) {
storage := &sharesStorage{
logger: logger,
shares: make(map[string]*types.SSVShare),
shares: new(sync.Map), // make(map[string]*types.SSVShare),
db: db,
prefix: prefix,
}
Expand All @@ -125,17 +123,15 @@ func NewSharesStorage(logger *zap.Logger, db basedb.Database, prefix []byte) (Sh
func() []*types.SSVShare { return storage.List(nil) },
func(pk []byte) (*types.SSVShare, bool) { return storage.Get(nil, pk) },
)
if err := storage.validatorStore.handleSharesAdded(maps.Values(storage.shares)...); err != nil {

if err := storage.validatorStore.handleSharesAdded(mapValues(storage.shares)...); err != nil {
return nil, nil, err
}
return storage, storage.validatorStore, nil
}

// load reads all shares from db.
func (s *sharesStorage) load() error {
s.mu.Lock()
defer s.mu.Unlock()

s.dbmu.Lock()
defer s.dbmu.Unlock()

Expand All @@ -150,53 +146,54 @@ func (s *sharesStorage) load() error {
return fmt.Errorf("failed to convert storage share to spec share: %w", err)
}

s.shares[hex.EncodeToString(val.ValidatorPubKey[:])] = share
key := hex.EncodeToString(val.ValidatorPubKey[:])
s.shares.Store(key, share)
return nil
})
}

func (s *sharesStorage) Get(_ basedb.Reader, pubKey []byte) (*types.SSVShare, bool) {
s.mu.RLock()
defer s.mu.RUnlock()

return s.unsafeGet(pubKey)
key := hex.EncodeToString(pubKey)
share, found := s.shares.Load(key)
if !found {
return nil, false
}
return share.(*types.SSVShare), true
}

func (s *sharesStorage) unsafeGet(pubKey []byte) (*types.SSVShare, bool) {
share, found := s.shares[hex.EncodeToString(pubKey)]
return share, found
}
func mapValues(m *sync.Map) (shares []*types.SSVShare) {
m.Range(func(_, value any) bool {
shares = append(shares, value.(*types.SSVShare))
return true
})

func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) []*types.SSVShare {
s.mu.RLock()
defer s.mu.RUnlock()
return
}

func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) (shares []*types.SSVShare) {
if len(filters) == 0 {
return maps.Values(s.shares)
return mapValues(s.shares)
}

var shares []*types.SSVShare
Shares:
for _, share := range s.shares {
s.shares.Range(func(key, value any) bool {
share := value.(*types.SSVShare)
for _, filter := range filters {
if !filter(share) {
continue Shares
return true
}
}
shares = append(shares, share)
}
return true
})

return shares
}

func (s *sharesStorage) Range(_ basedb.Reader, fn func(*types.SSVShare) bool) {
s.mu.RLock()
defer s.mu.RUnlock()

for _, share := range s.shares {
if !fn(share) {
break
}
}
s.shares.Range(func(key, value any) bool {
share := value.(*types.SSVShare)
return fn(share)
})
}

func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) error {
Expand Down Expand Up @@ -227,22 +224,19 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er
return err
}

s.mu.Lock()
defer s.mu.Unlock()

updateShares := make([]*types.SSVShare, 0, len(shares))
addShares := make([]*types.SSVShare, 0, len(shares))

for _, share := range shares {
key := hex.EncodeToString(share.ValidatorPubKey[:])

// Update validatorStore indices.
if _, ok := s.shares[key]; ok {
if _, loaded := s.shares.Swap(key, share); loaded {
updateShares = append(updateShares, share)
} else {
addShares = append(addShares, share)
s.shares.Store(key, share)
}
s.shares[key] = share
}

if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil {
Expand Down Expand Up @@ -318,9 +312,6 @@ func (s *sharesStorage) storageShareToSpecShare(share *storageShare) (*types.SSV
}

func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error {
s.mu.Lock()
defer s.mu.Unlock()

s.dbmu.Lock()
defer s.dbmu.Unlock()

Expand All @@ -329,13 +320,16 @@ func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error {
return err
}

share := s.shares[hex.EncodeToString(pubKey)]
if share == nil {
key := hex.EncodeToString(pubKey)
val, found := s.shares.Load(key)
if !found {
return nil
}

share := val.(*types.SSVShare)

// Remove the share from local storage map
delete(s.shares, hex.EncodeToString(pubKey))
s.shares.Delete(key)

// Remove the share from the validator store. This method will handle its own locking.
if err := s.validatorStore.handleShareRemoved(share); err != nil {
Expand All @@ -350,15 +344,12 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK]
var shares []*types.SSVShare

func() {
s.mu.RLock()
defer s.mu.RUnlock()

for pk, metadata := range data {
if metadata == nil {
continue
}

share, exists := s.unsafeGet(pk[:])
share, exists := s.Get(nil, pk[:])
if !exists {
continue
}
Expand All @@ -374,9 +365,6 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK]

// Drop deletes all shares.
func (s *sharesStorage) Drop() error {
s.mu.Lock()
defer s.mu.Unlock()

s.dbmu.Lock()
defer s.dbmu.Unlock()

Expand All @@ -388,7 +376,7 @@ func (s *sharesStorage) Drop() error {
return err
}

s.shares = make(map[string]*types.SSVShare)
s.shares = new(sync.Map) // make(map[string]*types.SSVShare)
s.validatorStore.handleDrop()
return nil
}
Expand Down

0 comments on commit d471e5e

Please sign in to comment.