Skip to content

Commit

Permalink
syncmap fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolie-ssv committed Oct 10, 2024
1 parent d471e5e commit e7ff883
Showing 1 changed file with 43 additions and 50 deletions.
93 changes: 43 additions & 50 deletions registry/storage/shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type sharesStorage struct {
prefix []byte
shares *sync.Map
validatorStore *validatorStore
dbmu sync.Mutex
mu sync.Mutex
}

type storageOperator struct {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (s *storageShare) Decode(data []byte) error {
func NewSharesStorage(logger *zap.Logger, db basedb.Database, prefix []byte) (Shares, ValidatorStore, error) {
storage := &sharesStorage{
logger: logger,
shares: new(sync.Map), // make(map[string]*types.SSVShare),
shares: new(sync.Map),
db: db,
prefix: prefix,
}
Expand All @@ -123,7 +123,6 @@ func NewSharesStorage(logger *zap.Logger, db basedb.Database, prefix []byte) (Sh
func() []*types.SSVShare { return storage.List(nil) },
func(pk []byte) (*types.SSVShare, bool) { return storage.Get(nil, pk) },
)

if err := storage.validatorStore.handleSharesAdded(mapValues(storage.shares)...); err != nil {
return nil, nil, err
}
Expand All @@ -132,8 +131,8 @@ func NewSharesStorage(logger *zap.Logger, db basedb.Database, prefix []byte) (Sh

// load reads all shares from db.
func (s *sharesStorage) load() error {
s.dbmu.Lock()
defer s.dbmu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

return s.db.GetAll(append(s.prefix, sharesPrefix...), func(i int, obj basedb.Obj) error {
val := &storageShare{}
Expand Down Expand Up @@ -161,15 +160,6 @@ func (s *sharesStorage) Get(_ basedb.Reader, pubKey []byte) (*types.SSVShare, bo
return share.(*types.SSVShare), true
}

func mapValues(m *sync.Map) (shares []*types.SSVShare) {
m.Range(func(_, value any) bool {
shares = append(shares, value.(*types.SSVShare))
return true
})

return
}

func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) (shares []*types.SSVShare) {
if len(filters) == 0 {
return mapValues(s.shares)
Expand All @@ -189,6 +179,15 @@ func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) (shares [
return shares
}

func mapValues(m *sync.Map) (shares []*types.SSVShare) {
m.Range(func(_, value any) bool {
shares = append(shares, value.(*types.SSVShare))
return true
})

return
}

func (s *sharesStorage) Range(_ basedb.Reader, fn func(*types.SSVShare) bool) {
s.shares.Range(func(key, value any) bool {
share := value.(*types.SSVShare)
Expand All @@ -207,19 +206,17 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er
}
}

err := func() error {
s.dbmu.Lock()
defer s.dbmu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

return s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) {
share := specShareToStorageShare(shares[i])
value, err := share.Encode()
if err != nil {
return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err)
}
return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil
})
}()
err := s.db.SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) {
share := specShareToStorageShare(shares[i])
value, err := share.Encode()
if err != nil {
return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err)
}
return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil
})
if err != nil {
return err
}
Expand All @@ -242,7 +239,6 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er
if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil {
return err
}

if err := s.validatorStore.handleSharesAdded(addShares...); err != nil {
return err
}
Expand Down Expand Up @@ -312,25 +308,24 @@ func (s *sharesStorage) storageShareToSpecShare(share *storageShare) (*types.SSV
}

func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error {
s.dbmu.Lock()
defer s.dbmu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

// Delete the share from the database
if err := s.db.Using(rw).Delete(s.prefix, s.storageKey(pubKey)); err != nil {
return err
}

key := hex.EncodeToString(pubKey)
val, found := s.shares.Load(key)

// Remove the share from local storage map
val, found := s.shares.LoadAndDelete(key)
if !found {
return nil
}

share := val.(*types.SSVShare)

// Remove the share from local storage map
s.shares.Delete(key)

// Remove the share from the validator store. This method will handle its own locking.
if err := s.validatorStore.handleShareRemoved(share); err != nil {
return err
Expand All @@ -343,30 +338,28 @@ func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error {
func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK]*beaconprotocol.ValidatorMetadata) error {
var shares []*types.SSVShare

func() {
for pk, metadata := range data {
if metadata == nil {
continue
}

share, exists := s.Get(nil, pk[:])
if !exists {
continue
}
for pk, metadata := range data {
if metadata == nil {
continue
}

share.BeaconMetadata = metadata
share.Share.ValidatorIndex = metadata.Index
shares = append(shares, share)
share, exists := s.Get(nil, pk[:])
if !exists {
continue
}
}()

share.BeaconMetadata = metadata
share.Share.ValidatorIndex = metadata.Index
shares = append(shares, share)
}

return s.Save(nil, shares...)
}

// Drop deletes all shares.
func (s *sharesStorage) Drop() error {
s.dbmu.Lock()
defer s.dbmu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

err := s.db.DropPrefix(bytes.Join(
[][]byte{s.prefix, sharesPrefix, []byte("/")},
Expand All @@ -376,7 +369,7 @@ func (s *sharesStorage) Drop() error {
return err
}

s.shares = new(sync.Map) // make(map[string]*types.SSVShare)
s.shares = new(sync.Map)
s.validatorStore.handleDrop()
return nil
}
Expand Down

0 comments on commit e7ff883

Please sign in to comment.