From 53bc6ac8cfe6876f17afad04eef671ae5c7af999 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 13 Jun 2024 16:42:04 +0800 Subject: [PATCH] fix(p2p): refine chain exchange peer selection logic (#4376) --- src/chain_sync/chain_muxer.rs | 36 ++++------- src/chain_sync/metrics.rs | 30 --------- src/chain_sync/network_context.rs | 7 +- src/libp2p/metrics.rs | 36 ++++++++++- src/libp2p/mod.rs | 2 +- src/libp2p/peer_manager.rs | 103 +++++++++++++++--------------- src/libp2p/service.rs | 2 + 7 files changed, 105 insertions(+), 111 deletions(-) diff --git a/src/chain_sync/chain_muxer.rs b/src/chain_sync/chain_muxer.rs index 90caf185712..a6379b25a0b 100644 --- a/src/chain_sync/chain_muxer.rs +++ b/src/chain_sync/chain_muxer.rs @@ -9,6 +9,16 @@ use std::{ }; use crate::chain::{ChainStore, Error as ChainStoreError}; +use crate::chain_sync::{ + bad_block_cache::BadBlockCache, + metrics, + network_context::SyncNetworkContext, + sync_state::SyncState, + tipset_syncer::{ + TipsetProcessor, TipsetProcessorError, TipsetRangeSyncer, TipsetRangeSyncerError, + }, + validation::{TipsetValidationError, TipsetValidator}, +}; use crate::libp2p::{ hello::HelloRequest, NetworkEvent, NetworkMessage, PeerId, PeerManager, PubsubMessage, }; @@ -33,17 +43,6 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::{debug, error, info, trace, warn}; -use crate::chain_sync::{ - bad_block_cache::BadBlockCache, - metrics, - network_context::SyncNetworkContext, - sync_state::SyncState, - tipset_syncer::{ - TipsetProcessor, TipsetProcessorError, TipsetRangeSyncer, TipsetRangeSyncerError, - }, - validation::{TipsetValidationError, TipsetValidator}, -}; - // Sync the messages for one or many tipsets @ a time // Lotus uses a window size of 8: https://github.com/filecoin-project/lotus/blob/c1d22d8b3298fdce573107413729be608e72187d/chain/sync.go#L56 const DEFAULT_REQUEST_WINDOW: usize = 8; @@ -288,10 +287,10 @@ where // Update the peer metadata based on the response match response { Some(_) => { - network.peer_manager().log_success(peer_id, dur); + network.peer_manager().log_success(&peer_id, dur); } None => { - network.peer_manager().log_failure(peer_id, dur); + network.peer_manager().log_failure(&peer_id, dur); } } } @@ -299,6 +298,7 @@ where async fn handle_peer_disconnected_event(network: SyncNetworkContext, peer_id: PeerId) { network.peer_manager().remove_peer(&peer_id); + network.peer_manager().unmark_peer_bad(&peer_id); } async fn gossipsub_block_to_full_tipset( @@ -369,13 +369,10 @@ where stateless_mode: bool, ) -> Result, ChainMuxerError> { let (tipset, source) = match event { - NetworkEvent::HelloRequestInbound { source, request } => { + NetworkEvent::HelloRequestInbound { .. } => { metrics::LIBP2P_MESSAGE_TOTAL .get_or_create(&metrics::values::HELLO_REQUEST_INBOUND) .inc(); - metrics::PEER_TIPSET_EPOCH - .get_or_create(&metrics::PeerLabel::new(source)) - .set(request.heaviest_tipset_height); return Ok(None); } NetworkEvent::HelloResponseOutbound { request, source } => { @@ -428,8 +425,6 @@ where metrics::LIBP2P_MESSAGE_TOTAL .get_or_create(&metrics::values::PEER_DISCONNECTED) .inc(); - // Remove peer id labels for disconnected peers - metrics::PEER_TIPSET_EPOCH.remove(&metrics::PeerLabel::new(peer_id)); // Spawn and immediately move on to the next event tokio::task::spawn(Self::handle_peer_disconnected_event( network.clone(), @@ -520,9 +515,6 @@ where network .peer_manager() .update_peer_head(source, Arc::new(tipset.clone().into_tipset())); - metrics::PEER_TIPSET_EPOCH - .get_or_create(&metrics::PeerLabel::new(source)) - .set(tipset.epoch()); Ok(Some((tipset, source))) } diff --git a/src/chain_sync/metrics.rs b/src/chain_sync/metrics.rs index 3f44460ca30..099fa8527cb 100644 --- a/src/chain_sync/metrics.rs +++ b/src/chain_sync/metrics.rs @@ -1,7 +1,6 @@ // Copyright 2019-2024 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use libp2p::PeerId; use once_cell::sync::Lazy; use prometheus_client::{ encoding::{EncodeLabelKey, EncodeLabelSet, EncodeLabelValue, LabelSetEncoder}, @@ -82,15 +81,6 @@ pub static LAST_VALIDATED_TIPSET_EPOCH: Lazy = Lazy::new(|| { ); metric }); -pub static PEER_TIPSET_EPOCH: Lazy> = Lazy::new(|| { - let metric = Family::default(); - crate::metrics::default_registry().register( - "peer_tipset_epoch", - "peer tipset epoch", - metric.clone(), - ); - metric -}); pub static NETWORK_HEAD_EVALUATION_ERRORS: Lazy = Lazy::new(|| { let metric = Counter::default(); crate::metrics::default_registry().register( @@ -128,26 +118,6 @@ pub static FOLLOW_NETWORK_ERRORS: Lazy = Lazy::new(|| { metric }); -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct PeerLabel(PeerId); - -impl PeerLabel { - pub const fn new(peer: PeerId) -> Self { - Self(peer) - } -} - -impl EncodeLabelSet for PeerLabel { - fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), std::fmt::Error> { - let mut label_encoder = encoder.encode_label(); - let mut label_key_encoder = label_encoder.encode_label_key()?; - EncodeLabelKey::encode(&"PEER", &mut label_key_encoder)?; - let mut label_value_encoder = label_key_encoder.encode_label_value()?; - EncodeLabelValue::encode(&self.0.to_string(), &mut label_value_encoder)?; - label_value_encoder.finish() - } -} - #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct Libp2pMessageKindLabel(&'static str); diff --git a/src/chain_sync/network_context.rs b/src/chain_sync/network_context.rs index 2e8bbe7e797..e3864995402 100644 --- a/src/chain_sync/network_context.rs +++ b/src/chain_sync/network_context.rs @@ -289,7 +289,6 @@ where // No specific peer set, send requests to a shuffled set of top peers until // a request succeeds. let peers = self.peer_manager.top_peers_shuffled(); - let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS); for peer_id in peers.into_iter() { let peer_manager = self.peer_manager.clone(); @@ -395,7 +394,7 @@ where match res { Ok(Ok(Ok(bs_res))) => { // Successful response - peer_manager.log_success(peer_id, res_duration); + peer_manager.log_success(&peer_id, res_duration); trace!("Succeeded: ChainExchange Request to {peer_id}"); Ok(bs_res) } @@ -416,7 +415,7 @@ where // Ignore dropping peer on timeout for now. Can't be confident yet that the // specified timeout is adequate time. RequestResponseError::Timeout | RequestResponseError::Io(_) => { - peer_manager.log_failure(peer_id, res_duration); + peer_manager.log_failure(&peer_id, res_duration); } } debug!("Failed: ChainExchange Request to {peer_id}"); @@ -425,7 +424,7 @@ where Ok(Err(_)) | Err(_) => { // Sender channel internally dropped or timeout, both should log failure which // will negatively score the peer, but not drop yet. - peer_manager.log_failure(peer_id, res_duration); + peer_manager.log_failure(&peer_id, res_duration); debug!("Timeout: ChainExchange Request to {peer_id}"); Err(format!("Chain exchange request to {peer_id} timed out")) } diff --git a/src/libp2p/metrics.rs b/src/libp2p/metrics.rs index b172332e47e..da9da13e9fd 100644 --- a/src/libp2p/metrics.rs +++ b/src/libp2p/metrics.rs @@ -1,8 +1,12 @@ // Copyright 2019-2024 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use libp2p::PeerId; use once_cell::sync::Lazy; -use prometheus_client::metrics::{counter::Counter, gauge::Gauge}; +use prometheus_client::{ + encoding::{EncodeLabelKey, EncodeLabelSet, EncodeLabelValue, LabelSetEncoder}, + metrics::{counter::Counter, family::Family, gauge::Gauge}, +}; pub static PEER_FAILURE_TOTAL: Lazy = Lazy::new(|| { let metric = Counter::default(); @@ -33,3 +37,33 @@ pub static BAD_PEERS: Lazy = Lazy::new(|| { ); metric }); + +pub static PEER_TIPSET_EPOCH: Lazy> = Lazy::new(|| { + let metric = Family::default(); + crate::metrics::default_registry().register( + "peer_tipset_epoch", + "peer tipset epoch", + metric.clone(), + ); + metric +}); + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct PeerLabel(PeerId); + +impl PeerLabel { + pub const fn new(peer: PeerId) -> Self { + Self(peer) + } +} + +impl EncodeLabelSet for PeerLabel { + fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), std::fmt::Error> { + let mut label_encoder = encoder.encode_label(); + let mut label_key_encoder = label_encoder.encode_label_key()?; + EncodeLabelKey::encode(&"PEER", &mut label_key_encoder)?; + let mut label_value_encoder = label_key_encoder.encode_label_value()?; + EncodeLabelValue::encode(&self.0.to_string(), &mut label_value_encoder)?; + label_value_encoder.finish() + } +} diff --git a/src/libp2p/mod.rs b/src/libp2p/mod.rs index 712d832ba08..32f6ad797f7 100644 --- a/src/libp2p/mod.rs +++ b/src/libp2p/mod.rs @@ -8,7 +8,7 @@ mod discovery; mod gossip_params; pub mod hello; pub mod keypair; -mod metrics; +pub mod metrics; mod peer_manager; pub mod ping; pub mod rpc; diff --git a/src/libp2p/peer_manager.rs b/src/libp2p/peer_manager.rs index 0a8ba1efbf7..c8dcb90ae80 100644 --- a/src/libp2p/peer_manager.rs +++ b/src/libp2p/peer_manager.rs @@ -27,11 +27,11 @@ const LOCAL_INV_ALPHA: u32 = 5; /// Global duration multiplier, affects duration delta change. const GLOBAL_INV_ALPHA: u32 = 20; -#[derive(Debug, Default)] +#[derive(Debug)] /// Contains info about the peer's head [Tipset], as well as the request stats. struct PeerInfo { /// Head tipset received from hello message. - head: Option>, + head: Arc, /// Number of successful requests. successes: u32, /// Number of failed requests. @@ -43,7 +43,7 @@ struct PeerInfo { impl PeerInfo { fn new(head: Arc) -> Self { Self { - head: Some(head), + head, successes: 0, failures: 0, average_time: Default::default(), @@ -96,21 +96,21 @@ impl PeerManager { pub fn update_peer_head(&self, peer_id: PeerId, ts: Arc) { let mut peers = self.peers.write(); trace!("Updating head for PeerId {}", &peer_id); + metrics::PEER_TIPSET_EPOCH + .get_or_create(&metrics::PeerLabel::new(peer_id)) + .set(ts.epoch()); if let Some(pi) = peers.full_peers.get_mut(&peer_id) { - pi.head = Some(ts); + pi.head = ts; } else { peers.full_peers.insert(peer_id, PeerInfo::new(ts)); - metrics::FULL_PEERS.inc(); + metrics::FULL_PEERS.set(peers.full_peers.len() as _); } } /// Gets the head epoch of a peer pub fn get_peer_head_epoch(&self, peer_id: &PeerId) -> Option { let peers = self.peers.read(); - peers - .full_peers - .get(peer_id) - .and_then(|pi| pi.head.as_ref().map(|ts| ts.epoch())) + peers.full_peers.get(peer_id).map(|pi| pi.head.epoch()) } /// Returns true if peer is not marked as bad or not already in set. @@ -127,22 +127,27 @@ impl PeerManager { let mut peers: Vec<_> = peer_lk .full_peers .iter() - .map(|(p, info)| { - let cost = if (info.successes + info.failures) > 0 { - // Calculate cost based on fail rate and latency - let fail_rate = f64::from(info.failures) / f64::from(info.successes); - info.average_time.as_secs_f64() + fail_rate * average_time.as_secs_f64() + .filter_map(|(p, info)| { + // Filter out nodes that are stateless (or far behind) + if info.head.epoch() > 0 { + let cost = if info.successes > 0 { + // Calculate cost based on fail rate and latency + let fail_rate = f64::from(info.failures) / f64::from(info.successes); + info.average_time.as_secs_f64() + fail_rate * average_time.as_secs_f64() + } else { + // There have been no failures or successes + average_time.as_secs_f64() * NEW_PEER_MUL + }; + Some((p, cost)) } else { - // There have been no failures or successes - average_time.as_secs_f64() * NEW_PEER_MUL - }; - (p, cost) + None + } }) .collect(); // Unstable sort because hashmap iter order doesn't need to be preserved. peers.sort_unstable_by(|(_, v1), (_, v2)| v1.partial_cmp(v2).unwrap_or(Ordering::Equal)); - peers.into_iter().map(|(p, _)| p).cloned().collect() + peers.into_iter().map(|(&peer, _)| peer).collect() } /// Return shuffled slice of ordered peers from the peer manager. Ordering @@ -156,7 +161,6 @@ impl PeerManager { // Shuffle top peers, to avoid sending all requests to same predictable peer. peers.shuffle(&mut rand::rngs::OsRng); - peers } @@ -178,66 +182,58 @@ impl PeerManager { /// Logs a success for the given peer, and updates the average request /// duration. - pub fn log_success(&self, peer: PeerId, dur: Duration) { + pub fn log_success(&self, peer: &PeerId, dur: Duration) { trace!("logging success for {peer}"); let mut peers = self.peers.write(); // Attempt to remove the peer and decrement bad peer count - if peers.bad_peers.remove(&peer) { - metrics::BAD_PEERS.dec(); + if peers.bad_peers.remove(peer) { + metrics::BAD_PEERS.set(peers.bad_peers.len() as _); }; - // If the peer is not already accounted for, increment full peer count - if !peers.full_peers.contains_key(&peer) { - metrics::FULL_PEERS.inc(); + if let Some(peer_stats) = peers.full_peers.get_mut(peer) { + peer_stats.successes += 1; + log_time(peer_stats, dur); } - let peer_stats = peers.full_peers.entry(peer).or_default(); - peer_stats.successes += 1; - log_time(peer_stats, dur); } /// Logs a failure for the given peer, and updates the average request /// duration. - pub fn log_failure(&self, peer: PeerId, dur: Duration) { + pub fn log_failure(&self, peer: &PeerId, dur: Duration) { trace!("logging failure for {peer}"); let mut peers = self.peers.write(); - if !peers.bad_peers.contains(&peer) { + if !peers.bad_peers.contains(peer) { metrics::PEER_FAILURE_TOTAL.inc(); - if !peers.full_peers.contains_key(&peer) { - metrics::FULL_PEERS.inc(); + if let Some(peer_stats) = peers.full_peers.get_mut(peer) { + peer_stats.failures += 1; + log_time(peer_stats, dur); } - let peer_stats = peers.full_peers.entry(peer).or_default(); - peer_stats.failures += 1; - log_time(peer_stats, dur); } } /// Removes a peer from the set and returns true if the value was present /// previously - pub fn mark_peer_bad(&self, peer_id: PeerId, reason: impl Into) -> bool { + pub fn mark_peer_bad(&self, peer_id: PeerId, reason: impl Into) { let mut peers = self.peers.write(); - let removed = remove_peer(&mut peers, &peer_id); - if removed { - metrics::FULL_PEERS.dec(); - } + remove_peer(&mut peers, &peer_id); // Add peer to bad peer set let reason = reason.into(); tracing::debug!(%peer_id, %reason, "marked peer bad"); if peers.bad_peers.insert(peer_id) { - metrics::BAD_PEERS.inc(); + metrics::BAD_PEERS.set(peers.bad_peers.len() as _); } + } - removed + pub fn unmark_peer_bad(&self, peer_id: &PeerId) { + let mut peers = self.peers.write(); + if peers.bad_peers.remove(peer_id) { + metrics::BAD_PEERS.set(peers.bad_peers.len() as _); + } } /// Remove peer from managed set, does not mark as bad - pub fn remove_peer(&self, peer_id: &PeerId) -> bool { + pub fn remove_peer(&self, peer_id: &PeerId) { let mut peers = self.peers.write(); - trace!("removed peer {peer_id}"); - let removed = remove_peer(&mut peers, peer_id); - if removed { - metrics::FULL_PEERS.dec(); - } - removed + remove_peer(&mut peers, peer_id); } /// Gets peer operation receiver @@ -308,13 +304,14 @@ impl PeerManager { } } -fn remove_peer(peers: &mut PeerSets, peer_id: &PeerId) -> bool { +fn remove_peer(peers: &mut PeerSets, peer_id: &PeerId) { + if peers.full_peers.remove(peer_id).is_some() { + metrics::FULL_PEERS.set(peers.full_peers.len() as _); + } trace!( "removing peer {peer_id}, remaining chain exchange peers: {}", peers.full_peers.len() ); - - peers.full_peers.remove(peer_id).is_some() } fn log_time(info: &mut PeerInfo, dur: Duration) { diff --git a/src/libp2p/service.rs b/src/libp2p/service.rs index 0e0f3672876..d65666ca5aa 100644 --- a/src/libp2p/service.rs +++ b/src/libp2p/service.rs @@ -587,6 +587,8 @@ async fn handle_discovery_event( } DiscoveryEvent::PeerDisconnected(peer_id) => { trace!("Peer disconnected, {peer_id}"); + // Remove peer id labels for disconnected peers + super::metrics::PEER_TIPSET_EPOCH.remove(&super::metrics::PeerLabel::new(peer_id)); emit_event(network_sender_out, NetworkEvent::PeerDisconnected(peer_id)).await; } DiscoveryEvent::Discovery(discovery_event) => match &*discovery_event {