Skip to content

Commit

Permalink
fix: move persistence outside of lock
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolie-ssv committed Oct 7, 2024
1 parent dc0c830 commit dd28280
Showing 1 changed file with 29 additions and 42 deletions.
71 changes: 29 additions & 42 deletions registry/storage/shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type sharesStorage struct {
shares map[string]*types.SSVShare
validatorStore *validatorStore
mu sync.RWMutex
dbmu sync.Mutex
}

type storageOperator struct {
Expand Down Expand Up @@ -135,6 +136,9 @@ func (s *sharesStorage) load() error {
s.mu.Lock()
defer s.mu.Unlock()

s.dbmu.Lock()
defer s.dbmu.Unlock()

return s.db.GetAll(append(s.prefix, sharesPrefix...), func(i int, obj basedb.Obj) error {
val := &storageShare{}
if err := val.Decode(obj.Value); err != nil {
Expand All @@ -159,12 +163,8 @@ func (s *sharesStorage) Get(_ basedb.Reader, pubKey []byte) (*types.SSVShare, bo
}

func (s *sharesStorage) unsafeGet(pubKey []byte) (*types.SSVShare, bool) {
share := s.shares[hex.EncodeToString(pubKey)]
if share == nil {
return nil, false
}

return share, true
share, found := s.shares[hex.EncodeToString(pubKey)]
return share, found
}

func (s *sharesStorage) List(_ basedb.Reader, filters ...SharesFilter) []*types.SSVShare {
Expand Down Expand Up @@ -210,25 +210,26 @@ func (s *sharesStorage) Save(rw basedb.ReadWriter, shares ...*types.SSVShare) er
}
}

s.mu.Lock()
defer s.mu.Unlock()
err := func() error {
s.dbmu.Lock()
defer s.dbmu.Unlock()

return s.unsafeSave(rw, shares...)
}

func (s *sharesStorage) unsafeSave(rw basedb.ReadWriter, shares ...*types.SSVShare) error {
err := s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) {
share := specShareToStorageShare(shares[i])
value, err := share.Encode()
if err != nil {
return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err)
}
return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil
})
return s.db.Using(rw).SetMany(s.prefix, len(shares), func(i int) (basedb.Obj, error) {
share := specShareToStorageShare(shares[i])
value, err := share.Encode()
if err != nil {
return basedb.Obj{}, fmt.Errorf("failed to serialize share: %w", err)
}
return basedb.Obj{Key: s.storageKey(share.ValidatorPubKey[:]), Value: value}, nil
})
}()
if err != nil {
return err
}

s.mu.Lock()
defer s.mu.Unlock()

updateShares := make([]*types.SSVShare, 0, len(shares))
addShares := make([]*types.SSVShare, 0, len(shares))

Expand All @@ -247,6 +248,7 @@ func (s *sharesStorage) unsafeSave(rw basedb.ReadWriter, shares ...*types.SSVSha
if err := s.validatorStore.handleSharesUpdated(updateShares...); err != nil {
return err
}

if err := s.validatorStore.handleSharesAdded(addShares...); err != nil {
return err
}
Expand Down Expand Up @@ -319,6 +321,9 @@ func (s *sharesStorage) Delete(rw basedb.ReadWriter, pubKey []byte) error {
s.mu.Lock()
defer s.mu.Unlock()

s.dbmu.Lock()
defer s.dbmu.Unlock()

// Delete the share from the database
if err := s.db.Using(rw).Delete(s.prefix, s.storageKey(pubKey)); err != nil {
return err
Expand Down Expand Up @@ -364,35 +369,17 @@ func (s *sharesStorage) UpdateValidatorsMetadata(data map[spectypes.ValidatorPK]
}
}()

saveShares := func(sshares []*types.SSVShare) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.unsafeSave(nil, sshares...); err != nil {
return err
}
return nil
}

// split into chunks to avoid holding the lock for too long
chunkSize := 1000
for i := 0; i < len(shares); i += chunkSize {
end := i + chunkSize
if end > len(shares) {
end = len(shares)
}
if err := saveShares(shares[i:end]); err != nil {
return err
}
}

return nil
return s.Save(nil, shares...)
}

// Drop deletes all shares.
func (s *sharesStorage) Drop() error {
s.mu.Lock()
defer s.mu.Unlock()

s.dbmu.Lock()
defer s.dbmu.Unlock()

err := s.db.DropPrefix(bytes.Join(
[][]byte{s.prefix, sharesPrefix, []byte("/")},
nil,
Expand Down

0 comments on commit dd28280

Please sign in to comment.