Skip to content

Commit

Permalink
merged changes to handle banning peers
Browse files Browse the repository at this point in the history
  • Loading branch information
zkxuerb committed Oct 24, 2024
2 parents ba2ce9a + 846244d commit b0b89c3
Show file tree
Hide file tree
Showing 18 changed files with 310 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 49 additions & 0 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ use futures::SinkExt;
use indexmap::{IndexMap, IndexSet};
use parking_lot::{Mutex, RwLock};
use rand::seq::{IteratorRandom, SliceRandom};
#[cfg(not(any(test, feature = "test")))]
use std::net::IpAddr;
use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
net::TcpStream,
Expand All @@ -89,6 +91,12 @@ const MIN_CONNECTED_VALIDATORS: usize = 175;
/// The maximum number of validators to send in a validators response event.
const MAX_VALIDATORS_TO_SEND: usize = 200;

/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
#[cfg(not(any(test, feature = "test")))]
const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
/// The amount of time an IP address is prohibited from connecting.
const IP_BAN_TIME_IN_SECS: u64 = 300;

/// Part of the Gateway API that deals with networking.
/// This is a separate trait to allow for easier testing/mocking.
#[async_trait]
Expand Down Expand Up @@ -460,6 +468,18 @@ impl<N: Network> Gateway<N> {
Ok(())
}

/// Check whether the given IP address is currently banned.
#[cfg(not(any(test, feature = "test")))]
fn is_ip_banned(&self, ip: IpAddr) -> bool {
self.tcp.banned_peers().is_ip_banned(&ip)
}

/// Insert or update a banned IP.
#[cfg(not(any(test, feature = "test")))]
fn update_ip_ban(&self, ip: IpAddr) {
self.tcp.banned_peers().update_ip_ban(ip);
}

#[cfg(feature = "metrics")]
fn update_metrics(&self) {
metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64);
Expand Down Expand Up @@ -885,6 +905,8 @@ impl<N: Network> Gateway<N> {
self.handle_unauthorized_validators();
// If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
self.handle_min_connected_validators();
// Unban any addresses whose ban time has expired.
self.handle_banned_ips();
}

/// Logs the connected validators.
Expand Down Expand Up @@ -965,6 +987,11 @@ impl<N: Network> Gateway<N> {
}
}
}

// Remove addresses whose ban time has expired.
fn handle_banned_ips(&self) {
self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
}
}

#[async_trait]
Expand Down Expand Up @@ -1125,6 +1152,28 @@ impl<N: Network> Handshake for Gateway<N> {
// Perform the handshake.
let peer_addr = connection.addr();
let peer_side = connection.side();

// Check (or impose) IP-level bans.
#[cfg(not(any(test, feature = "test")))]
if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
// If the IP is already banned reject the connection.
if self.is_ip_banned(peer_addr.ip()) {
trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
}

let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);

debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
if num_attempts >= MAX_CONNECTION_ATTEMPTS
{
self.update_ip_ban(peer_addr.ip());
trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
}

}

let stream = self.borrow_stream(&mut connection);

// If this is an inbound connection, we log it, but don't know the listening address yet.
Expand Down
5 changes: 3 additions & 2 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use crate::{
};
use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators};
use snarkos_node_sync::{locators::BlockLocators, BlockSync, BlockSyncMode};
use snarkos_node_tcp::P2P;
use snarkvm::{
console::{network::Network, types::Field},
ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
Expand Down Expand Up @@ -67,7 +68,7 @@ impl<N: Network> Sync<N> {
/// Initializes a new sync instance.
pub fn new(gateway: Gateway<N>, storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
// Initialize the block sync module.
let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone());
let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone());
// Return the sync instance.
Self {
gateway,
Expand Down
20 changes: 20 additions & 0 deletions node/router/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,26 @@ impl<N: Network> Router<N> {
Some(peer_addr)
};

// Check (or impose) IP-level bans.
#[cfg(not(any(test, feature = "test")))]
if !self.is_dev() && peer_side == ConnectionSide::Initiator {
// If the IP is already banned reject the connection.
if self.is_ip_banned(peer_addr.ip()) {
trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
}

let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);

debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
if num_attempts >= Router::<N>::MAX_CONNECTION_ATTEMPTS
{
self.update_ip_ban(peer_addr.ip());
trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
}
}

// Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
let handshake_result = if peer_side == ConnectionSide::Responder {
self.handshake_inner_initiator(peer_addr, &mut peer_ip, stream, genesis_header, restrictions_id).await
Expand Down
19 changes: 18 additions & 1 deletion node/router/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
/// The maximum number of provers to maintain connections with.
const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
/// The amount of time an IP address is prohibited from connecting.
const IP_BAN_TIME_IN_SECS: u64 = 300;

/// Handles the heartbeat request.
fn heartbeat(&self) {
Expand All @@ -61,6 +63,8 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
self.handle_trusted_peers();
// Keep the puzzle request up to date.
self.handle_puzzle_request();
// Unban any addresses whose ban time has expired.
self.handle_banned_ips();
}

/// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
Expand Down Expand Up @@ -233,11 +237,19 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
if num_deficient > 0 {
// Initialize an RNG.
let rng = &mut OsRng;
let banned_ips = self.tcp().banned_peers().get_banned_ips();

// Attempt to connect to more peers.
for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
for peer_ip in self
.router()
.candidate_peers()
.into_iter()
.filter(|peer| !banned_ips.contains(&peer.ip()))
.choose_multiple(rng, num_deficient)
{
self.router().connect(peer_ip);
}

if self.router().allow_external_peers() {
// Request more peers from the connected peers.
for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
Expand Down Expand Up @@ -298,4 +310,9 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
fn handle_puzzle_request(&self) {
// No-op
}

// Remove addresses whose ban time has expired.
fn handle_banned_ips(&self) {
self.tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
}
}
20 changes: 20 additions & 0 deletions node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};

use anyhow::{Result, bail};
use parking_lot::{Mutex, RwLock};
#[cfg(not(any(test, feature = "test")))]
use std::net::IpAddr;
use std::{
collections::{HashMap, HashSet},
future::Future,
Expand Down Expand Up @@ -107,6 +109,12 @@ impl<N: Network> Router<N> {
const MAXIMUM_CANDIDATE_PEERS: usize = 10_000;
/// The maximum number of connection failures permitted by an inbound connecting peer.
const MAXIMUM_CONNECTION_FAILURES: usize = 5;
/// The maximum amount of connection attempts withing a 10 second threshold
#[cfg(not(any(test, feature = "test")))]
const MAX_CONNECTION_ATTEMPTS: usize = 10;
/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
#[cfg(not(any(test, feature = "test")))]
const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
/// The duration in seconds after which a connected peer is considered inactive or
/// disconnected if no message has been received in the meantime.
const RADIO_SILENCE_IN_SECS: u64 = 150; // 2.5 minutes
Expand Down Expand Up @@ -436,6 +444,18 @@ impl<N: Network> Router<N> {
}
}

/// Check whether the given IP address is currently banned.
#[cfg(not(any(test, feature = "test")))]
fn is_ip_banned(&self, ip: IpAddr) -> bool {
self.tcp.banned_peers().is_ip_banned(&ip)
}

/// Insert or update a banned IP.
#[cfg(not(any(test, feature = "test")))]
fn update_ip_ban(&self, ip: IpAddr) {
self.tcp.banned_peers().update_ip_ban(ip);
}

/// Returns the list of metrics for the connected peers.
pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
self.connected_peers.read().iter().map(|(ip, peer)| (*ip, peer.node_type())).collect()
Expand Down
6 changes: 4 additions & 2 deletions node/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {

// Initialize the ledger service.
let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone());
// Determine if the client should allow external peers.
let allow_external_peers = true;

Expand All @@ -123,6 +121,10 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
matches!(storage_mode, StorageMode::Development(_)),
)
.await?;

// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());

// Initialize the node.
let mut node = Self {
ledger: ledger.clone(),
Expand Down
6 changes: 4 additions & 2 deletions node/src/prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {

// Initialize the ledger service.
let ledger_service = Arc::new(ProverLedgerService::new());
// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone());
// Determine if the prover should allow external peers.
let allow_external_peers = true;
// Determine if the prover should rotate external peers.
Expand All @@ -119,6 +117,10 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
matches!(storage_mode, StorageMode::Development(_)),
)
.await?;

// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());

// Compute the maximum number of puzzle instances.
let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
// Initialize the node.
Expand Down
7 changes: 4 additions & 3 deletions node/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,10 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {

// Initialize the ledger service.
let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service.clone());

// Initialize the consensus.
let mut consensus =
Consensus::new(account.clone(), ledger_service, bft_ip, trusted_validators, storage_mode.clone())?;
Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?;
// Initialize the primary channels.
let (primary_sender, primary_receiver) = init_primary_channels::<N>();
// Start the consensus.
Expand All @@ -133,6 +131,9 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
)
.await?;

// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone());

// Initialize the node.
let mut node = Self {
ledger: ledger.clone(),
Expand Down
4 changes: 4 additions & 0 deletions node/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ version = "=3.0.0"
path = "locators"
version = "=3.0.0"

[dependencies.snarkos-node-tcp]
path = "../tcp"
version = "=2.2.7"

[dependencies.snarkvm]
workspace = true

Expand Down
Loading

0 comments on commit b0b89c3

Please sign in to comment.