Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend the IP-level ban to Router-bound connections #3388

Draft
wants to merge 5 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/bft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ edition = "2021"
[features]
default = [ ]
metrics = [ "dep:metrics", "snarkos-node-bft-events/metrics", "snarkos-node-bft-ledger-service/metrics" ]
test = [ ]

[dependencies.aleo-std]
workspace = true
Expand Down Expand Up @@ -152,6 +153,10 @@ version = "0.4"
[dev-dependencies.rayon]
version = "1"

[dev-dependencies.snarkos-node-bft]
path = "."
features = [ "test" ]

[dev-dependencies.snarkos-node-bft-ledger-service]
path = "./ledger-service"
default-features = false
Expand Down
49 changes: 48 additions & 1 deletion node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ use futures::SinkExt;
use indexmap::{IndexMap, IndexSet};
use parking_lot::{Mutex, RwLock};
use rand::seq::{IteratorRandom, SliceRandom};
#[cfg(not(any(test, feature = "test")))]
use std::net::IpAddr;
use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
net::TcpStream,
Expand All @@ -88,6 +90,12 @@ const MIN_CONNECTED_VALIDATORS: usize = 175;
/// The maximum number of validators to send in a validators response event.
const MAX_VALIDATORS_TO_SEND: usize = 200;

/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
#[cfg(not(any(test, feature = "test")))]
const MIN_CONNECTION_INTERVAL_IN_SECS: u64 = 10;
/// The amount of time an IP address is prohibited from connecting.
const IP_BAN_TIME_IN_SECS: u64 = 30;

/// Part of the Gateway API that deals with networking.
/// This is a separate trait to allow for easier testing/mocking.
#[async_trait]
Expand Down Expand Up @@ -459,6 +467,18 @@ impl<N: Network> Gateway<N> {
Ok(())
}

/// Check whether the given IP address is currently banned.
#[cfg(not(any(test, feature = "test")))]
fn is_ip_banned(&self, ip: IpAddr) -> bool {
self.tcp.banned_peers().is_ip_banned(ip)
}

/// Insert or update a banned IP.
#[cfg(not(any(test, feature = "test")))]
fn update_ip_ban(&self, ip: IpAddr) {
self.tcp.banned_peers().update_ip_ban(ip);
}

#[cfg(feature = "metrics")]
fn update_metrics(&self) {
metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64);
Expand Down Expand Up @@ -875,6 +895,8 @@ impl<N: Network> Gateway<N> {
self.handle_unauthorized_validators();
// If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
self.handle_min_connected_validators();
// Unban any addresses whose ban time has expired.
self.handle_banned_ips();
}

/// Logs the connected validators.
Expand Down Expand Up @@ -955,6 +977,11 @@ impl<N: Network> Gateway<N> {
}
}
}

// Remove addresses whose ban time has expired.
fn handle_banned_ips(&self) {
self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
}
}

#[async_trait]
Expand Down Expand Up @@ -1105,9 +1132,29 @@ impl<N: Network> OnConnect for Gateway<N> {
impl<N: Network> Handshake for Gateway<N> {
/// Performs the handshake protocol.
async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
// Perform the handshake.
let peer_addr = connection.addr();
let peer_side = connection.side();

// Check (or impose) IP-level bans.
#[cfg(not(any(test, feature = "test")))]
if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
// If the IP is already banned, update the ban timestamp and reject the connection.
if self.is_ip_banned(peer_addr.ip()) {
self.update_ip_ban(peer_addr.ip());
trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
}

// Check the previous low-level connection timestamp.
if let Some(peer_stats) = self.tcp.known_peers().get(peer_addr.ip()) {
if peer_stats.timestamp().elapsed().as_secs() <= MIN_CONNECTION_INTERVAL_IN_SECS {
self.update_ip_ban(peer_addr.ip());
trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
}
}
}

let stream = self.borrow_stream(&mut connection);

// If this is an inbound connection, we log it, but don't know the listening address yet.
Expand Down
20 changes: 20 additions & 0 deletions node/router/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,26 @@ impl<N: Network> Router<N> {
Some(peer_addr)
};

// Check (or impose) IP-level bans.
#[cfg(not(any(test, feature = "test")))]
if !self.is_dev() && peer_side == ConnectionSide::Initiator {
// If the IP is already banned, update the ban timestamp and reject the connection.
if self.is_ip_banned(peer_addr.ip()) {
self.update_ip_ban(peer_addr.ip());
trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
}

// Check the previous low-level connection timestamp.
if let Some(peer_stats) = self.tcp.known_peers().get(peer_addr.ip()) {
if peer_stats.timestamp().elapsed().as_secs() <= Router::<N>::MIN_CONNECTION_INTERVAL_IN_SECS {
self.update_ip_ban(peer_addr.ip());
trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
}
}
}

// Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
let handshake_result = if peer_side == ConnectionSide::Responder {
self.handshake_inner_initiator(peer_addr, &mut peer_ip, stream, genesis_header, restrictions_id).await
Expand Down
9 changes: 9 additions & 0 deletions node/router/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
/// The maximum number of provers to maintain connections with.
const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
/// The amount of time an IP address is prohibited from connecting.
const IP_BAN_TIME_IN_SECS: u64 = 30;

/// Handles the heartbeat request.
fn heartbeat(&self) {
Expand All @@ -60,6 +62,8 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
self.handle_trusted_peers();
// Keep the puzzle request up to date.
self.handle_puzzle_request();
// Unban any addresses whose ban time has expired.
self.handle_banned_ips();
}

/// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
Expand Down Expand Up @@ -283,4 +287,9 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
fn handle_puzzle_request(&self) {
// No-op
}

// Remove addresses whose ban time has expired.
fn handle_banned_ips(&self) {
self.tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
}
}
17 changes: 17 additions & 0 deletions node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};

use anyhow::{bail, Result};
use parking_lot::{Mutex, RwLock};
#[cfg(not(any(test, feature = "test")))]
use std::net::IpAddr;
use std::{
collections::{HashMap, HashSet},
future::Future,
Expand Down Expand Up @@ -107,6 +109,9 @@ impl<N: Network> Router<N> {
/// The duration in seconds after which a connected peer is considered inactive or
/// disconnected if no message has been received in the meantime.
const RADIO_SILENCE_IN_SECS: u64 = 150; // 2.5 minutes
/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
#[cfg(not(any(test, feature = "test")))]
const MIN_CONNECTION_INTERVAL_IN_SECS: u64 = 10;
}

impl<N: Network> Router<N> {
Expand Down Expand Up @@ -427,6 +432,18 @@ impl<N: Network> Router<N> {
self.connected_peers.read().iter().map(|(ip, peer)| (*ip, peer.node_type())).collect()
}

/// Check whether the given IP address is currently banned.
#[cfg(not(any(test, feature = "test")))]
fn is_ip_banned(&self, ip: IpAddr) -> bool {
self.tcp.banned_peers().is_ip_banned(ip)
}

/// Insert or update a banned IP.
#[cfg(not(any(test, feature = "test")))]
fn update_ip_ban(&self, ip: IpAddr) {
self.tcp.banned_peers().update_ip_ban(ip);
}

#[cfg(feature = "metrics")]
fn update_metrics(&self) {
metrics::gauge(metrics::router::CONNECTED, self.connected_peers.read().len() as f64);
Expand Down
39 changes: 39 additions & 0 deletions node/tcp/src/helpers/banned_peers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (C) 2019-2023 Aleo Systems Inc.
// This file is part of the snarkOS library.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, net::IpAddr, time::Instant};

use parking_lot::RwLock;

/// Contains the set of peers currently banned by IP.
#[derive(Default)]
pub struct BannedPeers(RwLock<HashMap<IpAddr, Instant>>);

impl BannedPeers {
/// Check whether the given IP address is currently banned.
pub fn is_ip_banned(&self, ip: IpAddr) -> bool {
self.0.read().contains_key(&ip)
}

/// Insert or update a banned IP.
pub fn update_ip_ban(&self, ip: IpAddr) {
let timestamp = Instant::now();
self.0.write().insert(ip, timestamp);
}

/// Remove the expired entries.
pub fn remove_old_bans(&self, ban_time_in_secs: u64) {
self.0.write().retain(|_, timestamp| timestamp.elapsed().as_secs() < ban_time_in_secs);
}
}
33 changes: 23 additions & 10 deletions node/tcp/src/helpers/known_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,66 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use std::{
collections::{hash_map::Entry, HashMap},
net::IpAddr,
sync::Arc,
time::Instant,
};

use parking_lot::RwLock;

use crate::Stats;

/// Contains statistics related to Tcp's peers, currently connected or not.
#[derive(Default)]
pub struct KnownPeers(RwLock<HashMap<SocketAddr, Arc<Stats>>>);
pub struct KnownPeers(RwLock<HashMap<IpAddr, Arc<Stats>>>);

impl KnownPeers {
/// Adds an address to the list of known peers.
pub fn add(&self, addr: SocketAddr) {
self.0.write().entry(addr).or_default();
pub fn add(&self, addr: IpAddr) {
let timestamp = Instant::now();
match self.0.write().entry(addr) {
Entry::Vacant(entry) => {
entry.insert(Arc::new(Stats::new(timestamp)));
}
Entry::Occupied(entry) => {
*entry.get().timestamp.write() = timestamp;
}
}
}

/// Returns the stats for the given peer.
pub fn get(&self, addr: SocketAddr) -> Option<Arc<Stats>> {
pub fn get(&self, addr: IpAddr) -> Option<Arc<Stats>> {
self.0.read().get(&addr).map(Arc::clone)
}

/// Removes an address from the list of known peers.
pub fn remove(&self, addr: SocketAddr) -> Option<Arc<Stats>> {
pub fn remove(&self, addr: IpAddr) -> Option<Arc<Stats>> {
self.0.write().remove(&addr)
}

/// Returns the list of all known peers and their stats.
pub fn snapshot(&self) -> HashMap<SocketAddr, Arc<Stats>> {
pub fn snapshot(&self) -> HashMap<IpAddr, Arc<Stats>> {
self.0.read().clone()
}

/// Registers a submission of a message to the given address.
pub fn register_sent_message(&self, to: SocketAddr, size: usize) {
pub fn register_sent_message(&self, to: IpAddr, size: usize) {
if let Some(stats) = self.0.read().get(&to) {
stats.register_sent_message(size);
}
}

/// Registers a receipt of a message to the given address.
pub fn register_received_message(&self, from: SocketAddr, size: usize) {
pub fn register_received_message(&self, from: IpAddr, size: usize) {
if let Some(stats) = self.0.read().get(&from) {
stats.register_received_message(size);
}
}

/// Registers a failure associated with the given address.
pub fn register_failure(&self, addr: SocketAddr) {
pub fn register_failure(&self, addr: IpAddr) {
if let Some(stats) = self.0.read().get(&addr) {
stats.register_failure();
}
Expand Down
3 changes: 3 additions & 0 deletions node/tcp/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
mod config;
pub use config::Config;

mod banned_peers;
pub use banned_peers::BannedPeers;

pub mod connections;
pub use connections::{Connection, ConnectionSide};

Expand Down
26 changes: 24 additions & 2 deletions node/tcp/src/helpers/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
use parking_lot::RwLock;
use std::{
sync::atomic::{AtomicU64, Ordering::Relaxed},
time::Instant,
};

/// Contains statistics related to Tcp.
#[derive(Default)]
pub struct Stats {
/// The timestamp of the creation (for the node) or connection (to a peer).
pub(crate) timestamp: RwLock<Instant>,
/// The number of all messages sent.
msgs_sent: AtomicU64,
/// The number of all messages received.
Expand All @@ -30,6 +35,23 @@ pub struct Stats {
}

impl Stats {
/// Creates a new instance of the object.
pub fn new(timestamp: Instant) -> Self {
Self {
timestamp: RwLock::new(timestamp),
msgs_sent: Default::default(),
msgs_received: Default::default(),
bytes_sent: Default::default(),
bytes_received: Default::default(),
failures: Default::default(),
}
}

/// Returns the creation or connection timestamp.
pub fn timestamp(&self) -> Instant {
*self.timestamp.read()
}

/// Returns the number of sent messages and their collective size in bytes.
pub fn sent(&self) -> (u64, u64) {
let msgs = self.msgs_sent.load(Relaxed);
Expand Down
Loading