diff --git a/registry/storage/shares.go b/registry/storage/shares.go index 46cb1c852f..22d8b4d1ea 100644 --- a/registry/storage/shares.go +++ b/registry/storage/shares.go @@ -58,6 +58,7 @@ type sharesStorage struct { shares map[string]*types.SSVShare validatorStore *validatorStore mu sync.RWMutex + dbmu sync.Mutex } type storageOperator struct { @@ -135,6 +136,9 @@ func (s *sharesStorage) load() error { s.mu.Lock() defer s.mu.Unlock() + s.dbmu.Lock() + defer s.dbmu.Unlock() + return s.db.GetAll(append(s.prefix, sharesPrefix...), func(i int, obj basedb.Obj) error { val := &storageShare{} if err := val.Decode(obj.Value); err != nil { @@ -159,12 +163,8 @@ func (s *sharesStorage) Get(_ basedb.Reader, pubKey []byte) (*types.SSVShare, bo } func (s *sharesStorage) unsafeGet(pubKey []byte) (*types.SSVShare, bool) { - share := s.shares[hex.EncodeToString(pubKey)] - if share == nil { - return nil, false - } - - return share, true + share, found := s.shares[hex.EncodeToString(pubKey)] + return share, found } func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) []*types.SSVShare { @@ -210,25 +210,26 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er } } - s.mu.Lock() - defer s.mu.Unlock() - - return s.unsafeSave(rw, shares...) -} + err := func() error { + s.dbmu.Lock() + defer s.dbmu.Unlock() -func (s *sharesStorage) unsafeSave(rw basedb.ReadWriter, shares ...*types.SSVShare) error { - err := s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) { - share := specShareToStorageShare(shares[i]) - value, err := share.Encode() - if err != nil { - return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err) - } - return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil - }) + return s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) { + share := specShareToStorageShare(shares[i]) + value, err := share.Encode() + if err != nil { + return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err) + } + return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil + }) + }() if err != nil { return err } + s.mu.Lock() + defer s.mu.Unlock() + updateShares := make([]*types.SSVShare, 0, len(shares)) addShares := make([]*types.SSVShare, 0, len(shares)) @@ -247,11 +248,8 @@ func (s *sharesStorage) unsafeSave(rw basedb.ReadWriter, shares ...*types.SSVSha if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil { return err } - if err := s.validatorStore.handleSharesAdded(addShares...); err != nil { - return err - } - return nil + return s.validatorStore.handleSharesAdded(addShares...) } func specShareToStorageShare(share *types.SSVShare) *storageShare { @@ -319,6 +317,9 @@ func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error { s.mu.Lock() defer s.mu.Unlock() + s.dbmu.Lock() + defer s.dbmu.Unlock() + // Delete the share from the database if err := s.db.Using(rw).Delete(s.prefix, s.storageKey(pubKey)); err != nil { return err @@ -364,28 +365,7 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK] } }() - saveShares := func(sshares []*types.SSVShare) error { - s.mu.Lock() - defer s.mu.Unlock() - if err := s.unsafeSave(nil, sshares...); err != nil { - return err - } - return nil - } - - // split into chunks to avoid holding the lock for too long - chunkSize := 1000 - for i := 0; i < len(shares); i += chunkSize { - end := i + chunkSize - if end > len(shares) { - end = len(shares) - } - if err := saveShares(shares[i:end]); err != nil { - return err - } - } - - return nil + return s.Save(nil, shares...) } // Drop deletes all shares. @@ -393,6 +373,9 @@ func (s *sharesStorage) Drop() error { s.mu.Lock() defer s.mu.Unlock() + s.dbmu.Lock() + defer s.dbmu.Unlock() + err := s.db.DropPrefix(bytes.Join( [][]byte{s.prefix, sharesPrefix, []byte("/")}, nil,