Skip to content

Commit

Permalink
feat: shard aware pruning of peer store
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Aug 14, 2024
1 parent 3b2cde8 commit 62dea88
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 28 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ rlnKeystore.json
test_onchain.json
*.bkp
*.log
.vscode

# sqlite db
*.db
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
err := w.host.Connect(ctx, info)
if err != nil {
w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info)
w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID)
return err
}

Expand All @@ -770,7 +770,7 @@ func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
}
}

w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info)
w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info.ID)

w.metrics.RecordDial()

Expand Down
3 changes: 2 additions & 1 deletion waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,9 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.addConnectionBackoff(pi.ID)
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi)
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID)
c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
<-sem
}
102 changes: 93 additions & 9 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ const (
// some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found")

const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 5

Expand Down Expand Up @@ -241,6 +243,96 @@ func (pm *PeerManager) Start(ctx context.Context) {
go pm.peerEventLoop(ctx)
}
go pm.connectivityLoop(ctx)
go pm.peerStoreLoop(ctx)
}

func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
t := time.NewTicker(prunePeerStoreInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.prunePeerStore()
}
}
}

func (pm *PeerManager) prunePeerStore() {
peers := pm.host.Peerstore().Peers()
numPeers := len(peers)
if numPeers < pm.maxPeers {
return
}
peerCntBeforePruning := numPeers
pm.logger.Debug("peerstore capacity exceeded, hence pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", peerCntBeforePruning))

for _, peerID := range peers {
connFailues := pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID)
if connFailues > maxFailedAttempts {
// safety check so that we don't end up disconnecting connected peers.
if pm.host.Network().Connectedness(peerID) == network.Connected {
pm.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(peerID)
continue
}
pm.host.Peerstore().RemovePeer(peerID)
numPeers--
}
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers))
return
}
}

notConnectedPeers := pm.getPeersBasedOnconnectionStatus("", network.NotConnected)
peersByTopic := make(map[string]peer.IDSlice)

//prune not connected peers without shard
for _, peerID := range notConnectedPeers {
topics, err := pm.host.Peerstore().(wps.WakuPeerstore).PubSubTopics(peerID)
//Prune peers without pubsubtopics.
if err != nil || len(topics) == 0 {
pm.host.Peerstore().RemovePeer(peerID)
numPeers--
} else {
for topic := range topics {
peersByTopic[topic] = append(peersByTopic[topic], peerID)
}
}
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers))
return
}
}

// calculate the avg peers per shard
total, maxPeerCnt := 0, 0
for _, peersInTopic := range peersByTopic {
peerLen := len(peersInTopic)
total += peerLen
if peerLen > maxPeerCnt {
maxPeerCnt = peerLen
}
}
avgPerTopic := min(1, total/maxPeerCnt)
// prune peers from shard with higher than avg count

for _, peers := range peersByTopic {
count := max(len(peers)-avgPerTopic, 0)
for i, pID := range peers {
if i > count {
break
}
pm.host.Peerstore().RemovePeer(pID)
numPeers--
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers))
return
}
}
}
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers))
}

// This is a connectivity loop, which currently checks and prunes inbound connections.
Expand Down Expand Up @@ -444,11 +536,6 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
// AddDiscoveredPeer to add dynamically discovered peers.
// Note that these peers will not be set in service-slots.
func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
//Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes.
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
return
}

//Check if the peer is already present, if so skip adding
_, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID)
if err == nil {
Expand Down Expand Up @@ -503,10 +590,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
// addPeer adds peer to the peerStore.
// It also sets additional metadata such as origin and supported protocols
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error {
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
pm.logger.Error("could not add peer as peer store capacity is reached", zap.Stringer("peer", ID), zap.Int("capacity", pm.maxPeers))
return errors.New("peer store capacity reached")
}

pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID))
if origin == wps.Static {
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL)
Expand Down
18 changes: 9 additions & 9 deletions waku/v2/peerstore/waku_peer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type WakuPeerstore interface {
PeersByOrigin(origin Origin) peer.IDSlice
SetENR(p peer.ID, enr *enode.Node) error
ENR(p peer.ID) (*enode.Node, error)
AddConnFailure(p peer.AddrInfo)
ResetConnFailures(p peer.AddrInfo)
ConnFailures(p peer.AddrInfo) int
AddConnFailure(pID peer.ID)
ResetConnFailures(pID peer.ID)
ConnFailures(pID peer.ID) int

SetDirection(p peer.ID, direction network.Direction) error
Direction(p peer.ID) (network.Direction, error)
Expand Down Expand Up @@ -136,24 +136,24 @@ func (ps *WakuPeerstoreImpl) ENR(p peer.ID) (*enode.Node, error) {
}

// AddConnFailure increments connectionFailures for a peer
func (ps *WakuPeerstoreImpl) AddConnFailure(p peer.AddrInfo) {
func (ps *WakuPeerstoreImpl) AddConnFailure(pID peer.ID) {
ps.connFailures.Lock()
defer ps.connFailures.Unlock()
ps.connFailures.failures[p.ID]++
ps.connFailures.failures[pID]++
}

// ResetConnFailures resets connectionFailures for a peer to 0
func (ps *WakuPeerstoreImpl) ResetConnFailures(p peer.AddrInfo) {
func (ps *WakuPeerstoreImpl) ResetConnFailures(pID peer.ID) {
ps.connFailures.Lock()
defer ps.connFailures.Unlock()
ps.connFailures.failures[p.ID] = 0
ps.connFailures.failures[pID] = 0
}

// ConnFailures fetches connectionFailures for a peer
func (ps *WakuPeerstoreImpl) ConnFailures(p peer.AddrInfo) int {
func (ps *WakuPeerstoreImpl) ConnFailures(pID peer.ID) int {
ps.connFailures.RLock()
defer ps.connFailures.RUnlock()
return ps.connFailures.failures[p.ID]
return ps.connFailures.failures[pID]
}

// SetDirection sets connection direction for a specific peer.
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
if err != nil {
wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
ps.AddConnFailure(peerID)
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
} else {
wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
ps.AddConnFailure(peerID)
}
}
logger.Error("opening peer stream", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/legacy_store/waku_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor
logger.Error("creating stream to peer", zap.Error(err))
store.metrics.RecordError(dialFailure)
if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer})
ps.AddConnFailure(selectedPeer)
}
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
logger.Error("creating stream to peer", zap.Error(err))
wakuLP.metrics.RecordError(dialFailure)
if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
ps.AddConnFailure(peerID)
}
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/metadata/waku_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
ps.AddConnFailure(peerID)
}
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/peer_exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
if err != nil {
if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: params.selectedPeer})
ps.AddConnFailure(params.selectedPeer)
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer})
ps.AddConnFailure(selectedPeer)
}
return nil, err
}
Expand Down

0 comments on commit 62dea88

Please sign in to comment.