Skip to content

Commit

Permalink
fix(p2p): refine chain exchange peer selection logic (#4376)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 authored Jun 13, 2024
1 parent 2ba03f5 commit 53bc6ac
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 111 deletions.
36 changes: 14 additions & 22 deletions src/chain_sync/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ use std::{
};

use crate::chain::{ChainStore, Error as ChainStoreError};
use crate::chain_sync::{
bad_block_cache::BadBlockCache,
metrics,
network_context::SyncNetworkContext,
sync_state::SyncState,
tipset_syncer::{
TipsetProcessor, TipsetProcessorError, TipsetRangeSyncer, TipsetRangeSyncerError,
},
validation::{TipsetValidationError, TipsetValidator},
};
use crate::libp2p::{
hello::HelloRequest, NetworkEvent, NetworkMessage, PeerId, PeerManager, PubsubMessage,
};
Expand All @@ -33,17 +43,6 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{debug, error, info, trace, warn};

use crate::chain_sync::{
bad_block_cache::BadBlockCache,
metrics,
network_context::SyncNetworkContext,
sync_state::SyncState,
tipset_syncer::{
TipsetProcessor, TipsetProcessorError, TipsetRangeSyncer, TipsetRangeSyncerError,
},
validation::{TipsetValidationError, TipsetValidator},
};

// Sync the messages for one or many tipsets @ a time
// Lotus uses a window size of 8: https://github.com/filecoin-project/lotus/blob/c1d22d8b3298fdce573107413729be608e72187d/chain/sync.go#L56
const DEFAULT_REQUEST_WINDOW: usize = 8;
Expand Down Expand Up @@ -288,17 +287,18 @@ where
// Update the peer metadata based on the response
match response {
Some(_) => {
network.peer_manager().log_success(peer_id, dur);
network.peer_manager().log_success(&peer_id, dur);
}
None => {
network.peer_manager().log_failure(peer_id, dur);
network.peer_manager().log_failure(&peer_id, dur);
}
}
}
}

async fn handle_peer_disconnected_event(network: SyncNetworkContext<DB>, peer_id: PeerId) {
network.peer_manager().remove_peer(&peer_id);
network.peer_manager().unmark_peer_bad(&peer_id);
}

async fn gossipsub_block_to_full_tipset(
Expand Down Expand Up @@ -369,13 +369,10 @@ where
stateless_mode: bool,
) -> Result<Option<(FullTipset, PeerId)>, ChainMuxerError> {
let (tipset, source) = match event {
NetworkEvent::HelloRequestInbound { source, request } => {
NetworkEvent::HelloRequestInbound { .. } => {
metrics::LIBP2P_MESSAGE_TOTAL
.get_or_create(&metrics::values::HELLO_REQUEST_INBOUND)
.inc();
metrics::PEER_TIPSET_EPOCH
.get_or_create(&metrics::PeerLabel::new(source))
.set(request.heaviest_tipset_height);
return Ok(None);
}
NetworkEvent::HelloResponseOutbound { request, source } => {
Expand Down Expand Up @@ -428,8 +425,6 @@ where
metrics::LIBP2P_MESSAGE_TOTAL
.get_or_create(&metrics::values::PEER_DISCONNECTED)
.inc();
// Remove peer id labels for disconnected peers
metrics::PEER_TIPSET_EPOCH.remove(&metrics::PeerLabel::new(peer_id));
// Spawn and immediately move on to the next event
tokio::task::spawn(Self::handle_peer_disconnected_event(
network.clone(),
Expand Down Expand Up @@ -520,9 +515,6 @@ where
network
.peer_manager()
.update_peer_head(source, Arc::new(tipset.clone().into_tipset()));
metrics::PEER_TIPSET_EPOCH
.get_or_create(&metrics::PeerLabel::new(source))
.set(tipset.epoch());

Ok(Some((tipset, source)))
}
Expand Down
30 changes: 0 additions & 30 deletions src/chain_sync/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use libp2p::PeerId;
use once_cell::sync::Lazy;
use prometheus_client::{
encoding::{EncodeLabelKey, EncodeLabelSet, EncodeLabelValue, LabelSetEncoder},
Expand Down Expand Up @@ -82,15 +81,6 @@ pub static LAST_VALIDATED_TIPSET_EPOCH: Lazy<Gauge> = Lazy::new(|| {
);
metric
});
pub static PEER_TIPSET_EPOCH: Lazy<Family<PeerLabel, Gauge>> = Lazy::new(|| {
let metric = Family::default();
crate::metrics::default_registry().register(
"peer_tipset_epoch",
"peer tipset epoch",
metric.clone(),
);
metric
});
pub static NETWORK_HEAD_EVALUATION_ERRORS: Lazy<Counter> = Lazy::new(|| {
let metric = Counter::default();
crate::metrics::default_registry().register(
Expand Down Expand Up @@ -128,26 +118,6 @@ pub static FOLLOW_NETWORK_ERRORS: Lazy<Counter> = Lazy::new(|| {
metric
});

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct PeerLabel(PeerId);

impl PeerLabel {
pub const fn new(peer: PeerId) -> Self {
Self(peer)
}
}

impl EncodeLabelSet for PeerLabel {
fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), std::fmt::Error> {
let mut label_encoder = encoder.encode_label();
let mut label_key_encoder = label_encoder.encode_label_key()?;
EncodeLabelKey::encode(&"PEER", &mut label_key_encoder)?;
let mut label_value_encoder = label_key_encoder.encode_label_value()?;
EncodeLabelValue::encode(&self.0.to_string(), &mut label_value_encoder)?;
label_value_encoder.finish()
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Libp2pMessageKindLabel(&'static str);

Expand Down
7 changes: 3 additions & 4 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ where
// No specific peer set, send requests to a shuffled set of top peers until
// a request succeeds.
let peers = self.peer_manager.top_peers_shuffled();

let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
for peer_id in peers.into_iter() {
let peer_manager = self.peer_manager.clone();
Expand Down Expand Up @@ -395,7 +394,7 @@ where
match res {
Ok(Ok(Ok(bs_res))) => {
// Successful response
peer_manager.log_success(peer_id, res_duration);
peer_manager.log_success(&peer_id, res_duration);
trace!("Succeeded: ChainExchange Request to {peer_id}");
Ok(bs_res)
}
Expand All @@ -416,7 +415,7 @@ where
// Ignore dropping peer on timeout for now. Can't be confident yet that the
// specified timeout is adequate time.
RequestResponseError::Timeout | RequestResponseError::Io(_) => {
peer_manager.log_failure(peer_id, res_duration);
peer_manager.log_failure(&peer_id, res_duration);
}
}
debug!("Failed: ChainExchange Request to {peer_id}");
Expand All @@ -425,7 +424,7 @@ where
Ok(Err(_)) | Err(_) => {
// Sender channel internally dropped or timeout, both should log failure which
// will negatively score the peer, but not drop yet.
peer_manager.log_failure(peer_id, res_duration);
peer_manager.log_failure(&peer_id, res_duration);
debug!("Timeout: ChainExchange Request to {peer_id}");
Err(format!("Chain exchange request to {peer_id} timed out"))
}
Expand Down
36 changes: 35 additions & 1 deletion src/libp2p/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use libp2p::PeerId;
use once_cell::sync::Lazy;
use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
use prometheus_client::{
encoding::{EncodeLabelKey, EncodeLabelSet, EncodeLabelValue, LabelSetEncoder},
metrics::{counter::Counter, family::Family, gauge::Gauge},
};

pub static PEER_FAILURE_TOTAL: Lazy<Counter> = Lazy::new(|| {
let metric = Counter::default();
Expand Down Expand Up @@ -33,3 +37,33 @@ pub static BAD_PEERS: Lazy<Gauge> = Lazy::new(|| {
);
metric
});

pub static PEER_TIPSET_EPOCH: Lazy<Family<PeerLabel, Gauge>> = Lazy::new(|| {
let metric = Family::default();
crate::metrics::default_registry().register(
"peer_tipset_epoch",
"peer tipset epoch",
metric.clone(),
);
metric
});

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct PeerLabel(PeerId);

impl PeerLabel {
pub const fn new(peer: PeerId) -> Self {
Self(peer)
}
}

impl EncodeLabelSet for PeerLabel {
fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), std::fmt::Error> {
let mut label_encoder = encoder.encode_label();
let mut label_key_encoder = label_encoder.encode_label_key()?;
EncodeLabelKey::encode(&"PEER", &mut label_key_encoder)?;
let mut label_value_encoder = label_key_encoder.encode_label_value()?;
EncodeLabelValue::encode(&self.0.to_string(), &mut label_value_encoder)?;
label_value_encoder.finish()
}
}
2 changes: 1 addition & 1 deletion src/libp2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod discovery;
mod gossip_params;
pub mod hello;
pub mod keypair;
mod metrics;
pub mod metrics;
mod peer_manager;
pub mod ping;
pub mod rpc;
Expand Down
Loading

0 comments on commit 53bc6ac

Please sign in to comment.