Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spam connections and forged block locators fix #3422

Merged
merged 8 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ use futures::SinkExt;
use indexmap::{IndexMap, IndexSet};
use parking_lot::{Mutex, RwLock};
use rand::seq::{IteratorRandom, SliceRandom};
#[cfg(not(any(test, feature = "test")))]
use std::net::IpAddr;
use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
net::TcpStream,
Expand All @@ -89,6 +91,12 @@ const MIN_CONNECTED_VALIDATORS: usize = 175;
/// The maximum number of validators to send in a validators response event.
const MAX_VALIDATORS_TO_SEND: usize = 200;

/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
#[cfg(not(any(test, feature = "test")))]
const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
/// The amount of time an IP address is prohibited from connecting.
const IP_BAN_TIME_IN_SECS: u64 = 300;

/// Part of the Gateway API that deals with networking.
/// This is a separate trait to allow for easier testing/mocking.
#[async_trait]
Expand Down Expand Up @@ -460,6 +468,18 @@ impl<N: Network> Gateway<N> {
Ok(())
}

/// Check whether the given IP address is currently banned.
#[cfg(not(any(test, feature = "test")))]
fn is_ip_banned(&self, ip: IpAddr) -> bool {
self.tcp.banned_peers().is_ip_banned(&ip)
}

/// Insert or update a banned IP.
#[cfg(not(any(test, feature = "test")))]
fn update_ip_ban(&self, ip: IpAddr) {
self.tcp.banned_peers().update_ip_ban(ip);
}

#[cfg(feature = "metrics")]
fn update_metrics(&self) {
metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64);
Expand Down Expand Up @@ -885,6 +905,8 @@ impl<N: Network> Gateway<N> {
self.handle_unauthorized_validators();
// If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
self.handle_min_connected_validators();
// Unban any addresses whose ban time has expired.
self.handle_banned_ips();
}

/// Logs the connected validators.
Expand Down Expand Up @@ -965,6 +987,11 @@ impl<N: Network> Gateway<N> {
}
}
}

// Remove addresses whose ban time has expired.
fn handle_banned_ips(&self) {
self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
}
}

#[async_trait]
Expand Down Expand Up @@ -1125,6 +1152,26 @@ impl<N: Network> Handshake for Gateway<N> {
// Perform the handshake.
let peer_addr = connection.addr();
let peer_side = connection.side();

// Check (or impose) IP-level bans.
#[cfg(not(any(test, feature = "test")))]
if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
// If the IP is already banned reject the connection.
if self.is_ip_banned(peer_addr.ip()) {
trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
}

let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);

debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
if num_attempts > MAX_CONNECTION_ATTEMPTS {
self.update_ip_ban(peer_addr.ip());
trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
}
}

let stream = self.borrow_stream(&mut connection);

// If this is an inbound connection, we log it, but don't know the listening address yet.
Expand Down
3 changes: 2 additions & 1 deletion node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators};
use snarkos_node_tcp::P2P;
use snarkvm::{
console::{network::Network, types::Field},
ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
Expand Down Expand Up @@ -67,7 +68,7 @@ impl<N: Network> Sync<N> {
/// Initializes a new sync instance.
pub fn new(gateway: Gateway<N>, storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
// Initialize the block sync module.
let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone());
let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone());
// Return the sync instance.
Self {
gateway,
Expand Down
20 changes: 20 additions & 0 deletions node/router/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,26 @@ impl<N: Network> Router<N> {
Some(peer_addr)
};

// Check (or impose) IP-level bans.
#[cfg(not(any(test, feature = "test")))]
if !self.is_dev() && peer_side == ConnectionSide::Initiator {
// If the IP is already banned reject the connection.
if self.is_ip_banned(peer_addr.ip()) {
trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
}

let num_attempts =
self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);

debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
self.update_ip_ban(peer_addr.ip());
trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
}
}

// Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
let handshake_result = if peer_side == ConnectionSide::Responder {
self.handshake_inner_initiator(peer_addr, &mut peer_ip, stream, genesis_header, restrictions_id).await
Expand Down
10 changes: 10 additions & 0 deletions node/router/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
/// The maximum number of provers to maintain connections with.
const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
/// The amount of time an IP address is prohibited from connecting.
const IP_BAN_TIME_IN_SECS: u64 = 300;

/// Handles the heartbeat request.
fn heartbeat(&self) {
Expand All @@ -61,6 +63,8 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
self.handle_trusted_peers();
// Keep the puzzle request up to date.
self.handle_puzzle_request();
// Unban any addresses whose ban time has expired.
self.handle_banned_ips();
}

/// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
Expand Down Expand Up @@ -238,6 +242,7 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
self.router().connect(peer_ip);
}

if self.router().allow_external_peers() {
// Request more peers from the connected peers.
for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
Expand Down Expand Up @@ -298,4 +303,9 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
fn handle_puzzle_request(&self) {
// No-op
}

// Remove addresses whose ban time has expired.
fn handle_banned_ips(&self) {
self.tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
}
}
26 changes: 24 additions & 2 deletions node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ pub use routing::*;

use crate::messages::NodeType;
use snarkos_account::Account;
use snarkos_node_tcp::{Config, Tcp, is_bogon_ip, is_unspecified_or_broadcast_ip};
use snarkos_node_tcp::{Config, P2P, Tcp, is_bogon_ip, is_unspecified_or_broadcast_ip};

use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};

use anyhow::{Result, bail};
use parking_lot::{Mutex, RwLock};
#[cfg(not(any(test, feature = "test")))]
use std::net::IpAddr;
use std::{
collections::{HashMap, HashSet, hash_map::Entry},
future::Future,
Expand Down Expand Up @@ -103,10 +106,16 @@ pub struct InnerRouter<N: Network> {
}

impl<N: Network> Router<N> {
/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
#[cfg(not(any(test, feature = "test")))]
const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
/// The maximum number of candidate peers permitted to be stored in the node.
const MAXIMUM_CANDIDATE_PEERS: usize = 10_000;
/// The maximum number of connection failures permitted by an inbound connecting peer.
const MAXIMUM_CONNECTION_FAILURES: usize = 5;
/// The maximum amount of connection attempts withing a 10 second threshold
#[cfg(not(any(test, feature = "test")))]
const MAX_CONNECTION_ATTEMPTS: usize = 10;
/// The duration in seconds after which a connected peer is considered inactive or
/// disconnected if no message has been received in the meantime.
const RADIO_SILENCE_IN_SECS: u64 = 150; // 2.5 minutes
Expand Down Expand Up @@ -390,7 +399,8 @@ impl<N: Network> Router<N> {

/// Returns the list of candidate peers.
pub fn candidate_peers(&self) -> HashSet<SocketAddr> {
self.candidate_peers.read().clone()
let banned_ips = self.tcp().banned_peers().get_banned_ips();
self.candidate_peers.read().iter().filter(|peer| !banned_ips.contains(&peer.ip())).copied().collect()
}

/// Returns the list of restricted peers.
Expand Down Expand Up @@ -439,6 +449,18 @@ impl<N: Network> Router<N> {
}
}

/// Check whether the given IP address is currently banned.
#[cfg(not(any(test, feature = "test")))]
fn is_ip_banned(&self, ip: IpAddr) -> bool {
self.tcp.banned_peers().is_ip_banned(&ip)
}

/// Insert or update a banned IP.
#[cfg(not(any(test, feature = "test")))]
fn update_ip_ban(&self, ip: IpAddr) {
self.tcp.banned_peers().update_ip_ban(ip);
}

/// Returns the list of metrics for the connected peers.
pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
self.connected_peers.read().iter().map(|(ip, peer)| (*ip, peer.node_type())).collect()
Expand Down
6 changes: 4 additions & 2 deletions node/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {

// Initialize the ledger service.
let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone());
// Determine if the client should allow external peers.
let allow_external_peers = true;

Expand All @@ -123,6 +121,10 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
matches!(storage_mode, StorageMode::Development(_)),
)
.await?;

// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
zkxuerb marked this conversation as resolved.
Show resolved Hide resolved

// Initialize the node.
let mut node = Self {
ledger: ledger.clone(),
Expand Down
6 changes: 4 additions & 2 deletions node/src/prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {

// Initialize the ledger service.
let ledger_service = Arc::new(ProverLedgerService::new());
// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone());
// Determine if the prover should allow external peers.
let allow_external_peers = true;
// Determine if the prover should rotate external peers.
Expand All @@ -119,6 +117,10 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
matches!(storage_mode, StorageMode::Development(_)),
)
.await?;

// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
zkxuerb marked this conversation as resolved.
Show resolved Hide resolved

// Compute the maximum number of puzzle instances.
let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
// Initialize the node.
Expand Down
7 changes: 4 additions & 3 deletions node/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,10 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {

// Initialize the ledger service.
let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service.clone());

// Initialize the consensus.
let mut consensus =
Consensus::new(account.clone(), ledger_service, bft_ip, trusted_validators, storage_mode.clone())?;
Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?;
// Initialize the primary channels.
let (primary_sender, primary_receiver) = init_primary_channels::<N>();
// Start the consensus.
Expand All @@ -133,6 +131,9 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
)
.await?;

// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone());
zkxuerb marked this conversation as resolved.
Show resolved Hide resolved

// Initialize the node.
let mut node = Self {
ledger: ledger.clone(),
Expand Down
4 changes: 4 additions & 0 deletions node/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ version = "=3.0.0"
path = "locators"
version = "=3.0.0"

[dependencies.snarkos-node-tcp]
path = "../tcp"
version = "=3.0.0"

[dependencies.snarkvm]
workspace = true

Expand Down
Loading