Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: shard aware pruning of peer store #1193

Merged
merged 5 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ rlnKeystore.json
test_onchain.json
*.bkp
*.log
.vscode

# sqlite db
*.db
Expand Down
6 changes: 3 additions & 3 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,9 @@ func (w *WakuNode) Start(ctx context.Context) error {
if err != nil {
return err
}
w.peermanager.Start(ctx)
w.registerAndMonitorReachability(ctx)
}
w.peermanager.Start(ctx)

w.legacyStore = w.storeFactory(w)
w.legacyStore.SetHost(host)
Expand Down Expand Up @@ -752,7 +752,7 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
err := w.host.Connect(ctx, info)
if err != nil {
w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info)
w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID)
return err
}

Expand All @@ -770,7 +770,7 @@ func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
}
}

w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info)
w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info.ID)

w.metrics.RecordDial()

Expand Down
3 changes: 2 additions & 1 deletion waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,9 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.addConnectionBackoff(pi.ID)
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi)
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID)
c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
<-sem
}
124 changes: 110 additions & 14 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ const (
// some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found")

const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 5

Expand Down Expand Up @@ -234,13 +236,115 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) {

// Start starts the processing to be done by peer manager.
func (pm *PeerManager) Start(ctx context.Context) {
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)

pm.ctx = ctx
if pm.sub != nil && pm.RelayEnabled {
go pm.peerEventLoop(ctx)
if pm.RelayEnabled {
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)
if pm.sub != nil {
go pm.peerEventLoop(ctx)
}
go pm.connectivityLoop(ctx)
}
go pm.peerStoreLoop(ctx)
}

func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
t := time.NewTicker(prunePeerStoreInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.prunePeerStore()
}
}
go pm.connectivityLoop(ctx)
}

func (pm *PeerManager) prunePeerStore() {
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
peers := pm.host.Peerstore().Peers()
numPeers := len(peers)
if numPeers < pm.maxPeers {
pm.logger.Debug("peerstore size within capacity, not pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", numPeers))
return
}
peerCntBeforePruning := numPeers
pm.logger.Debug("peerstore capacity exceeded, hence pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", peerCntBeforePruning))

for _, peerID := range peers {
connFailues := pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID)
if connFailues > maxFailedAttempts {
// safety check so that we don't end up disconnecting connected peers.
if pm.host.Network().Connectedness(peerID) == network.Connected {
pm.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(peerID)
continue
}
pm.host.Peerstore().RemovePeer(peerID)
numPeers--
}
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers))
return
}
}

notConnectedPeers := pm.getPeersBasedOnconnectionStatus("", network.NotConnected)
peersByTopic := make(map[string]peer.IDSlice)
var prunedPeers peer.IDSlice

//prune not connected peers without shard
for _, peerID := range notConnectedPeers {
topics, err := pm.host.Peerstore().(wps.WakuPeerstore).PubSubTopics(peerID)
//Prune peers without pubsubtopics.
if err != nil || len(topics) == 0 {
if err != nil {
pm.logger.Error("pruning:failed to fetch pubsub topics", zap.Error(err), zap.Stringer("peer", peerID))
}
prunedPeers = append(prunedPeers, peerID)
pm.host.Peerstore().RemovePeer(peerID)
numPeers--
} else {
prunedPeers = append(prunedPeers, peerID)
for topic := range topics {
peersByTopic[topic] = append(peersByTopic[topic], peerID)
}
}
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers), zap.Stringers("prunedPeers", prunedPeers))
return
}
}
pm.logger.Debug("pruned notconnected peers", zap.Stringers("prunedPeers", prunedPeers))

// calculate the avg peers per shard
total, maxPeerCnt := 0, 0
for _, peersInTopic := range peersByTopic {
peerLen := len(peersInTopic)
total += peerLen
if peerLen > maxPeerCnt {
maxPeerCnt = peerLen
}
}
avgPerTopic := min(1, total/maxPeerCnt)
// prune peers from shard with higher than avg count

for topic, peers := range peersByTopic {
count := max(len(peers)-avgPerTopic, 0)
var prunedPeers peer.IDSlice
for i, pID := range peers {
if i > count {
break
}
prunedPeers = append(prunedPeers, pID)
pm.host.Peerstore().RemovePeer(pID)
numPeers--
if numPeers < pm.maxPeers {
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers), zap.Stringers("prunedPeers", prunedPeers))
return
}
}
pm.logger.Debug("pruned peers higher than average", zap.Stringers("prunedPeers", prunedPeers), zap.String("topic", topic))
}
pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers))
}

// This is a connectivity loop, which currently checks and prunes inbound connections.
Expand Down Expand Up @@ -444,11 +548,6 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
// AddDiscoveredPeer to add dynamically discovered peers.
// Note that these peers will not be set in service-slots.
func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
//Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes.
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
return
}

//Check if the peer is already present, if so skip adding
_, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID)
if err == nil {
Expand Down Expand Up @@ -503,10 +602,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
// addPeer adds peer to the peerStore.
// It also sets additional metadata such as origin and supported protocols
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error {
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
pm.logger.Error("could not add peer as peer store capacity is reached", zap.Stringer("peer", ID), zap.Int("capacity", pm.maxPeers))
return errors.New("peer store capacity reached")
}

pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID))
if origin == wps.Static {
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL)
Expand Down
18 changes: 9 additions & 9 deletions waku/v2/peerstore/waku_peer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type WakuPeerstore interface {
PeersByOrigin(origin Origin) peer.IDSlice
SetENR(p peer.ID, enr *enode.Node) error
ENR(p peer.ID) (*enode.Node, error)
AddConnFailure(p peer.AddrInfo)
ResetConnFailures(p peer.AddrInfo)
ConnFailures(p peer.AddrInfo) int
AddConnFailure(pID peer.ID)
ResetConnFailures(pID peer.ID)
ConnFailures(pID peer.ID) int

SetDirection(p peer.ID, direction network.Direction) error
Direction(p peer.ID) (network.Direction, error)
Expand Down Expand Up @@ -136,24 +136,24 @@ func (ps *WakuPeerstoreImpl) ENR(p peer.ID) (*enode.Node, error) {
}

// AddConnFailure increments connectionFailures for a peer
func (ps *WakuPeerstoreImpl) AddConnFailure(p peer.AddrInfo) {
func (ps *WakuPeerstoreImpl) AddConnFailure(pID peer.ID) {
ps.connFailures.Lock()
defer ps.connFailures.Unlock()
ps.connFailures.failures[p.ID]++
ps.connFailures.failures[pID]++
}

// ResetConnFailures resets connectionFailures for a peer to 0
func (ps *WakuPeerstoreImpl) ResetConnFailures(p peer.AddrInfo) {
func (ps *WakuPeerstoreImpl) ResetConnFailures(pID peer.ID) {
ps.connFailures.Lock()
defer ps.connFailures.Unlock()
ps.connFailures.failures[p.ID] = 0
ps.connFailures.failures[pID] = 0
}

// ConnFailures fetches connectionFailures for a peer
func (ps *WakuPeerstoreImpl) ConnFailures(p peer.AddrInfo) int {
func (ps *WakuPeerstoreImpl) ConnFailures(pID peer.ID) int {
ps.connFailures.RLock()
defer ps.connFailures.RUnlock()
return ps.connFailures.failures[p.ID]
return ps.connFailures.failures[pID]
}

// SetDirection sets connection direction for a specific peer.
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
if err != nil {
wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
ps.AddConnFailure(peerID)
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
} else {
wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
ps.AddConnFailure(peerID)
}
}
logger.Error("opening peer stream", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/legacy_store/waku_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor
logger.Error("creating stream to peer", zap.Error(err))
store.metrics.RecordError(dialFailure)
if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer})
ps.AddConnFailure(selectedPeer)
}
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
logger.Error("creating stream to peer", zap.Error(err))
wakuLP.metrics.RecordError(dialFailure)
if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
ps.AddConnFailure(peerID)
}
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/metadata/waku_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
ps.AddConnFailure(peerID)
}
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/peer_exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
if err != nil {
if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: params.selectedPeer})
ps.AddConnFailure(params.selectedPeer)
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer})
ps.AddConnFailure(selectedPeer)
}
return nil, err
}
Expand Down
Loading