From 9cdf9ecc785ed45778afd579746fc71a9f984e07 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 29 Oct 2024 16:39:56 +0530 Subject: [PATCH] fix: use bad peer removal logic only for lightpush and filter --- waku/v2/peermanager/peer_manager.go | 26 ++++++-------------- waku/v2/protocol/filter/client.go | 5 ++++ waku/v2/protocol/lightpush/waku_lightpush.go | 5 ++++ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index ec4dcddea..69a0b23c1 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -102,7 +102,6 @@ const maxFailedAttempts = 5 const prunePeerStoreInterval = 10 * time.Minute const peerConnectivityLoopSecs = 15 const maxConnsToPeerRatio = 3 -const badPeersCleanupInterval = 1 * time.Minute const maxDialFailures = 2 // 80% relay peers 20% service peers @@ -258,14 +257,13 @@ func (pm *PeerManager) Start(ctx context.Context) { } } -func (pm *PeerManager) removeBadPeers() { - if !pm.RelayEnabled { - for _, peerID := range pm.host.Peerstore().Peers() { - if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures { - //delete peer from peerStore - pm.logger.Info("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) - pm.RemovePeer(peerID) - } +func (pm *PeerManager) CheckAndRemoveBadPeer(peerID peer.ID) { + if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures && + pm.peerConnector.onlineChecker.IsOnline() { + if origin, _ := pm.host.Peerstore().(wps.WakuPeerstore).Origin(peerID); origin != wps.Static { // delete only if a peer is discovered and not configured statically. + //delete peer from peerStore + pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) + pm.RemovePeer(peerID) } } } @@ -273,17 +271,13 @@ func (pm *PeerManager) removeBadPeers() { func (pm *PeerManager) peerStoreLoop(ctx context.Context) { defer utils.LogOnPanic() t := time.NewTicker(prunePeerStoreInterval) - t1 := time.NewTicker(badPeersCleanupInterval) defer t.Stop() - defer t1.Stop() for { select { case <-ctx.Done(): return case <-t.C: pm.prunePeerStore() - case <-t1.C: - pm.removeBadPeers() } } } @@ -749,6 +743,7 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { if err == nil || errors.Is(err, context.Canceled) { return } + if pm.peerConnector != nil { pm.peerConnector.addConnectionBackoff(peerID) } @@ -762,9 +757,4 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { pm.logger.Error("failed to emit DialError", zap.Error(emitterErr)) } } - if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures { - //delete peer from peerStore - pm.logger.Info("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) - pm.RemovePeer(peerID) - } } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 878b0d48d..a53e99dc9 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" @@ -249,6 +250,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, wf.metrics.RecordError(dialFailure) if wf.pm != nil { wf.pm.HandleDialError(err, peerID) + if errors.Is(err, swarm.ErrAllDialsFailed) || + errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) { + wf.pm.CheckAndRemoveBadPeer(peerID) + } } return err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 9d6744315..a97007073 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" @@ -198,6 +199,10 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p wakuLP.metrics.RecordError(dialFailure) if wakuLP.pm != nil { wakuLP.pm.HandleDialError(err, peerID) + if errors.Is(err, swarm.ErrAllDialsFailed) || + errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) { + wakuLP.pm.CheckAndRemoveBadPeer(peerID) + } } return nil, err }