From c8cde54d4aa855de7e4e54cb46b75039f6fe758c Mon Sep 17 00:00:00 2001 From: zkxuerb Date: Mon, 14 Oct 2024 22:19:46 +0200 Subject: [PATCH 1/5] adds banned peers list --- node/bft/src/gateway.rs | 20 +++++++++ node/tcp/src/helpers/banned_peers.rs | 66 ++++++++++++++++++++++++++++ node/tcp/src/helpers/known_peers.rs | 33 +++++++++----- node/tcp/src/helpers/mod.rs | 3 ++ node/tcp/src/helpers/stats.rs | 26 ++++++++++- node/tcp/src/protocols/reading.rs | 6 +-- node/tcp/src/protocols/writing.rs | 4 +- node/tcp/src/tcp.rs | 27 +++++++----- 8 files changed, 156 insertions(+), 29 deletions(-) create mode 100644 node/tcp/src/helpers/banned_peers.rs diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index ac0d678f5b..63568db40d 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -65,6 +65,8 @@ use futures::SinkExt; use indexmap::{IndexMap, IndexSet}; use parking_lot::{Mutex, RwLock}; use rand::seq::{IteratorRandom, SliceRandom}; +#[cfg(not(any(test, feature = "test")))] +use std::net::IpAddr; use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration}; use tokio::{ net::TcpStream, @@ -89,6 +91,12 @@ const MIN_CONNECTED_VALIDATORS: usize = 175; /// The maximum number of validators to send in a validators response event. const MAX_VALIDATORS_TO_SEND: usize = 200; +/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious. +#[cfg(not(any(test, feature = "test")))] +const MIN_CONNECTION_INTERVAL_IN_SECS: u64 = 10; +/// The amount of time an IP address is prohibited from connecting. +const IP_BAN_TIME_IN_SECS: u64 = 300; + /// Part of the Gateway API that deals with networking. /// This is a separate trait to allow for easier testing/mocking. #[async_trait] @@ -460,6 +468,18 @@ impl Gateway { Ok(()) } + /// Check whether the given IP address is currently banned. + #[cfg(not(any(test, feature = "test")))] + fn is_ip_banned(&self, ip: IpAddr) -> bool { + self.tcp.banned_peers().is_ip_banned(&ip) + } + + /// Insert or update a banned IP. + #[cfg(not(any(test, feature = "test")))] + fn update_ip_ban(&self, ip: IpAddr) { + self.tcp.banned_peers().update_ip_ban(ip); + } + #[cfg(feature = "metrics")] fn update_metrics(&self) { metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64); diff --git a/node/tcp/src/helpers/banned_peers.rs b/node/tcp/src/helpers/banned_peers.rs new file mode 100644 index 0000000000..d1d3a28a90 --- /dev/null +++ b/node/tcp/src/helpers/banned_peers.rs @@ -0,0 +1,66 @@ +// Copyright 2024 Aleo Network Foundation +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::HashMap, net::IpAddr, time::Instant}; + +use parking_lot::RwLock; +use tracing::trace; + +/// Contains the ban details for a banned peer. +pub struct BanConfig { + /// The time when the ban was created. + banned_at: Instant, + /// Amount of times the peer has been banned. + banned_count: u8, +} + +impl BanConfig { + /// Creates a new ban config. + pub fn new(count: u8) -> Self { + Self { banned_at: Instant::now(), banned_count: count } + } +} + +/// Contains the set of peers currently banned by IP. +#[derive(Default)] +pub struct BannedPeers(RwLock>); + +impl BannedPeers { + /// Check whether the given IP address is currently banned. + pub fn is_ip_banned(&self, ip: &IpAddr) -> bool { + self.0.read().contains_key(ip) + } + + /// Get ban count for the given IP address. + pub fn get_ban_count(&self, ip: IpAddr) -> Option { + self.0.read().get(&ip).map(|delay| delay.banned_count) + } + + /// Insert or update a banned IP. + pub fn update_ip_ban(&self, ip: IpAddr) { + let count = self.get_ban_count(ip).unwrap_or(0).saturating_add(1u8); + + trace!("Banning IP: {:?} with count: {}", ip, count); + let ban_config = BanConfig::new(count); + self.0.write().insert(ip, ban_config); + } + + /// Remove the expired entries + pub fn remove_old_bans(&self, ban_time_in_secs: u64) { + self.0.write().retain(|_, ban_config| { + ban_config.banned_at.elapsed().as_secs() < (ban_time_in_secs << ban_config.banned_count.max(32)) + }); + } +} diff --git a/node/tcp/src/helpers/known_peers.rs b/node/tcp/src/helpers/known_peers.rs index 07f0508357..a50241b01e 100644 --- a/node/tcp/src/helpers/known_peers.rs +++ b/node/tcp/src/helpers/known_peers.rs @@ -13,7 +13,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use std::{ + collections::{hash_map::Entry, HashMap}, + net::IpAddr, + sync::Arc, + time::Instant, +}; use parking_lot::RwLock; @@ -21,45 +26,53 @@ use crate::Stats; /// Contains statistics related to Tcp's peers, currently connected or not. #[derive(Default)] -pub struct KnownPeers(RwLock>>); +pub struct KnownPeers(RwLock>>); impl KnownPeers { /// Adds an address to the list of known peers. - pub fn add(&self, addr: SocketAddr) { - self.0.write().entry(addr).or_default(); + pub fn add(&self, addr: IpAddr) { + let timestamp = Instant::now(); + match self.0.write().entry(addr) { + Entry::Vacant(entry) => { + entry.insert(Arc::new(Stats::new(timestamp))); + } + Entry::Occupied(entry) => { + *entry.get().timestamp.write() = timestamp; + } + } } /// Returns the stats for the given peer. - pub fn get(&self, addr: SocketAddr) -> Option> { + pub fn get(&self, addr: IpAddr) -> Option> { self.0.read().get(&addr).map(Arc::clone) } /// Removes an address from the list of known peers. - pub fn remove(&self, addr: SocketAddr) -> Option> { + pub fn remove(&self, addr: IpAddr) -> Option> { self.0.write().remove(&addr) } /// Returns the list of all known peers and their stats. - pub fn snapshot(&self) -> HashMap> { + pub fn snapshot(&self) -> HashMap> { self.0.read().clone() } /// Registers a submission of a message to the given address. - pub fn register_sent_message(&self, to: SocketAddr, size: usize) { + pub fn register_sent_message(&self, to: IpAddr, size: usize) { if let Some(stats) = self.0.read().get(&to) { stats.register_sent_message(size); } } /// Registers a receipt of a message to the given address. - pub fn register_received_message(&self, from: SocketAddr, size: usize) { + pub fn register_received_message(&self, from: IpAddr, size: usize) { if let Some(stats) = self.0.read().get(&from) { stats.register_received_message(size); } } /// Registers a failure associated with the given address. - pub fn register_failure(&self, addr: SocketAddr) { + pub fn register_failure(&self, addr: IpAddr) { if let Some(stats) = self.0.read().get(&addr) { stats.register_failure(); } diff --git a/node/tcp/src/helpers/mod.rs b/node/tcp/src/helpers/mod.rs index 55cf62818b..d8b9a3e6c5 100644 --- a/node/tcp/src/helpers/mod.rs +++ b/node/tcp/src/helpers/mod.rs @@ -16,6 +16,9 @@ mod config; pub use config::Config; +mod banned_peers; +pub use banned_peers::BannedPeers; + pub mod connections; pub use connections::{Connection, ConnectionSide}; diff --git a/node/tcp/src/helpers/stats.rs b/node/tcp/src/helpers/stats.rs index 403640d4cb..aed215e6d1 100644 --- a/node/tcp/src/helpers/stats.rs +++ b/node/tcp/src/helpers/stats.rs @@ -13,11 +13,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicU64, Ordering::Relaxed}; +use parking_lot::RwLock; +use std::{ + sync::atomic::{AtomicU64, Ordering::Relaxed}, + time::Instant, +}; /// Contains statistics related to Tcp. -#[derive(Default)] pub struct Stats { + /// The timestamp of the creation (for the node) or connection (to a peer). + pub(crate) timestamp: RwLock, /// The number of all messages sent. msgs_sent: AtomicU64, /// The number of all messages received. @@ -31,6 +36,23 @@ pub struct Stats { } impl Stats { + /// Creates a new instance of the object. + pub fn new(timestamp: Instant) -> Self { + Self { + timestamp: RwLock::new(timestamp), + msgs_sent: Default::default(), + msgs_received: Default::default(), + bytes_sent: Default::default(), + bytes_received: Default::default(), + failures: Default::default(), + } + } + + /// Returns the creation or connection timestamp. + pub fn timestamp(&self) -> Instant { + *self.timestamp.read() + } + /// Returns the number of sent messages and their collective size in bytes. pub fn sent(&self) -> (u64, u64) { let msgs = self.msgs_sent.load(Relaxed); diff --git a/node/tcp/src/protocols/reading.rs b/node/tcp/src/protocols/reading.rs index a11f88fd89..9e403b3fa6 100644 --- a/node/tcp/src/protocols/reading.rs +++ b/node/tcp/src/protocols/reading.rs @@ -144,7 +144,7 @@ impl ReadingInternal for R { while let Some(msg) = inbound_message_receiver.recv().await { if let Err(e) = self_clone.process_message(addr, msg).await { error!(parent: node.span(), "can't process a message from {addr}: {e}"); - node.known_peers().register_failure(addr); + node.known_peers().register_failure(addr.ip()); } #[cfg(feature = "metrics")] metrics::decrement_gauge(metrics::tcp::TCP_TASKS, 1f64); @@ -179,7 +179,7 @@ impl ReadingInternal for R { } Err(e) => { error!(parent: node.span(), "can't read from {addr}: {e}"); - node.known_peers().register_failure(addr); + node.known_peers().register_failure(addr.ip()); if node.config().fatal_io_errors.contains(&e.kind()) { break; } @@ -230,7 +230,7 @@ impl Decoder for CountingCodec { if ret.is_some() { self.acc = 0; - self.node.known_peers().register_received_message(self.addr, read_len); + self.node.known_peers().register_received_message(self.addr.ip(), read_len); self.node.stats().register_received_message(read_len); } else { self.acc = read_len; diff --git a/node/tcp/src/protocols/writing.rs b/node/tcp/src/protocols/writing.rs index d5cdad8682..16ea2dc490 100644 --- a/node/tcp/src/protocols/writing.rs +++ b/node/tcp/src/protocols/writing.rs @@ -220,12 +220,12 @@ impl WritingInternal for W { match self_clone.write_to_stream(*msg, &mut framed).await { Ok(len) => { let _ = wrapped_msg.delivery_notification.send(Ok(())); - node.known_peers().register_sent_message(addr, len); + node.known_peers().register_sent_message(addr.ip(), len); node.stats().register_sent_message(len); trace!(parent: node.span(), "sent {}B to {}", len, addr); } Err(e) => { - node.known_peers().register_failure(addr); + node.known_peers().register_failure(addr.ip()); error!(parent: node.span(), "couldn't send a message to {}: {}", addr, e); let is_fatal = node.config().fatal_io_errors.contains(&e.kind()); let _ = wrapped_msg.delivery_notification.send(Err(e)); diff --git a/node/tcp/src/tcp.rs b/node/tcp/src/tcp.rs index 364dd770ff..6d5bc2de34 100644 --- a/node/tcp/src/tcp.rs +++ b/node/tcp/src/tcp.rs @@ -23,7 +23,7 @@ use std::{ atomic::{AtomicUsize, Ordering::*}, Arc, }, - time::Duration, + time::{Duration, Instant}, }; use once_cell::sync::OnceCell; @@ -40,6 +40,7 @@ use tracing::*; use crate::{ connections::{Connection, ConnectionSide, Connections}, protocols::{Protocol, Protocols}, + BannedPeers, Config, KnownPeers, Stats, @@ -76,6 +77,8 @@ pub struct InnerTcp { connections: Connections, /// Collects statistics related to the node's peers. known_peers: KnownPeers, + /// Contains the set of currently banned peers. + banned_peers: BannedPeers, /// Collects statistics related to the node itself. stats: Stats, /// The node's tasks. @@ -102,7 +105,8 @@ impl Tcp { connecting: Default::default(), connections: Default::default(), known_peers: Default::default(), - stats: Default::default(), + banned_peers: Default::default(), + stats: Stats::new(Instant::now()), tasks: Default::default(), })); @@ -166,6 +170,12 @@ impl Tcp { &self.known_peers } + /// Returns a reference to the set of currently banned peers. + #[inline] + pub fn banned_peers(&self) -> &BannedPeers { + &self.banned_peers + } + /// Returns a reference to the statistics. #[inline] pub fn stats(&self) -> &Stats { @@ -256,7 +266,7 @@ impl Tcp { if let Err(ref e) = ret { self.connecting.lock().remove(&addr); - self.known_peers().register_failure(addr); + self.known_peers().register_failure(addr.ip()); error!(parent: self.span(), "Unable to initiate a connection with {addr}: {e}"); } @@ -283,13 +293,6 @@ impl Tcp { task.abort(); } - // If the (owning) Tcp was not the initiator of the connection, it doesn't know the listening address - // of the associated peer, so the related stats are unreliable; the next connection initiated by the - // peer could be bound to an entirely different port number - if conn.side() == ConnectionSide::Initiator { - self.known_peers().remove(conn.addr()); - } - debug!(parent: self.span(), "Disconnected from {}", conn.addr()); } else { warn!(parent: self.span(), "Failed to disconnect, was not connected to {addr}"); @@ -387,7 +390,7 @@ impl Tcp { tokio::spawn(async move { if let Err(e) = tcp.adapt_stream(stream, addr, ConnectionSide::Responder).await { tcp.connecting.lock().remove(&addr); - tcp.known_peers().register_failure(addr); + tcp.known_peers().register_failure(addr.ip()); error!(parent: tcp.span(), "Failed to connect with {addr}: {e}"); } }); @@ -427,7 +430,7 @@ impl Tcp { /// Prepares the freshly acquired connection to handle the protocols the Tcp implements. async fn adapt_stream(&self, stream: TcpStream, peer_addr: SocketAddr, own_side: ConnectionSide) -> io::Result<()> { - self.known_peers.add(peer_addr); + self.known_peers.add(peer_addr.ip()); // Register the port seen by the peer. if own_side == ConnectionSide::Initiator { From d69ce13e16e17d9704e4f4b5d96a35148d6a62f4 Mon Sep 17 00:00:00 2001 From: zkxuerb Date: Mon, 14 Oct 2024 23:33:49 +0200 Subject: [PATCH 2/5] bans peers for not responding to requests and spamming connections --- Cargo.lock | 1 + node/bft/src/gateway.rs | 34 ++++++++++++++++++++ node/bft/src/sync/mod.rs | 3 +- node/router/src/handshake.rs | 26 ++++++++++++++++ node/router/src/heartbeat.rs | 19 +++++++++++- node/router/src/lib.rs | 20 ++++++++++++ node/src/client/mod.rs | 6 ++-- node/src/prover/mod.rs | 6 ++-- node/src/validator/mod.rs | 7 +++-- node/sync/Cargo.toml | 4 +++ node/sync/src/block_sync.rs | 46 ++++++++++++++++++++++++++-- node/tcp/src/helpers/banned_peers.rs | 40 +++++++++++++++++------- 12 files changed, 189 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ac26621d4d..9b55a3109f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3504,6 +3504,7 @@ dependencies = [ "snarkos-node-router", "snarkos-node-sync-communication-service", "snarkos-node-sync-locators", + "snarkos-node-tcp", "snarkvm", "tokio", "tracing", diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 63568db40d..b72fb74f64 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -896,6 +896,8 @@ impl Gateway { self.handle_unauthorized_validators(); // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`. self.handle_min_connected_validators(); + // Unban any addresses whose ban time has expired. + self.handle_banned_ips(); } /// Logs the connected validators. @@ -976,6 +978,11 @@ impl Gateway { } } } + + // Remove addresses whose ban time has expired. + fn handle_banned_ips(&self) { + self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS); + } } #[async_trait] @@ -1129,6 +1136,33 @@ impl Handshake for Gateway { // Perform the handshake. let peer_addr = connection.addr(); let peer_side = connection.side(); + + // Check (or impose) IP-level bans. + #[cfg(not(any(test, feature = "test")))] + if self.dev().is_none() && peer_side == ConnectionSide::Initiator { + // If the IP is already banned, update the ban timestamp and reject the connection. + if self.is_ip_banned(peer_addr.ip()) { + self.update_ip_ban(peer_addr.ip()); + trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip()); + return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); + } + + // Check the previous low-level connection timestamp. + if let Some(peer_stats) = self.tcp.known_peers().get(peer_addr.ip()) { + let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), 10); + + debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts); + debug!("Seconds since connection attempt: {}", peer_stats.timestamp().elapsed().as_secs()); + if peer_stats.timestamp().elapsed().as_secs() <= MIN_CONNECTION_INTERVAL_IN_SECS + && num_attempts >= MAX_CONNECTION_ATTEMPTS + { + self.update_ip_ban(peer_addr.ip()); + trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip()); + return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); + } + } + } + let stream = self.borrow_stream(&mut connection); // If this is an inbound connection, we log it, but don't know the listening address yet. diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 46cb269a54..8554523ab3 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -24,6 +24,7 @@ use crate::{ use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event}; use snarkos_node_bft_ledger_service::LedgerService; use snarkos_node_sync::{locators::BlockLocators, BlockSync, BlockSyncMode}; +use snarkos_node_tcp::P2P; use snarkvm::{ console::{network::Network, types::Field}, ledger::{authority::Authority, block::Block, narwhal::BatchCertificate}, @@ -67,7 +68,7 @@ impl Sync { /// Initializes a new sync instance. pub fn new(gateway: Gateway, storage: Storage, ledger: Arc>) -> Self { // Initialize the block sync module. - let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone()); + let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone()); // Return the sync instance. Self { gateway, diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index 3b8d083c23..f412227705 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -101,6 +101,32 @@ impl Router { Some(peer_addr) }; + // Check (or impose) IP-level bans. + #[cfg(not(any(test, feature = "test")))] + if !self.is_dev() && peer_side == ConnectionSide::Initiator { + // If the IP is already banned, update the ban timestamp and reject the connection. + if self.is_ip_banned(peer_addr.ip()) { + self.update_ip_ban(peer_addr.ip()); + trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip()); + return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); + } + + // Check the previous low-level connection timestamp. + if let Some(peer_stats) = self.tcp.known_peers().get(peer_addr.ip()) { + let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), 10); + + debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts); + debug!("Seconds since connection attempt: {}", peer_stats.timestamp().elapsed().as_secs()); + if peer_stats.timestamp().elapsed().as_secs() <= Router::::MIN_CONNECTION_INTERVAL_IN_SECS + && num_attempts >= Router::::MAX_CONNECTION_ATTEMPTS + { + self.update_ip_ban(peer_addr.ip()); + trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip()); + return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); + } + } + } + // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time. let handshake_result = if peer_side == ConnectionSide::Responder { self.handshake_inner_initiator(peer_addr, &mut peer_ip, stream, genesis_header, restrictions_id).await diff --git a/node/router/src/heartbeat.rs b/node/router/src/heartbeat.rs index baef5afdfc..8642d412ff 100644 --- a/node/router/src/heartbeat.rs +++ b/node/router/src/heartbeat.rs @@ -43,6 +43,8 @@ pub trait Heartbeat: Outbound { const MAXIMUM_NUMBER_OF_PEERS: usize = 21; /// The maximum number of provers to maintain connections with. const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4; + /// The amount of time an IP address is prohibited from connecting. + const IP_BAN_TIME_IN_SECS: u64 = 300; /// Handles the heartbeat request. fn heartbeat(&self) { @@ -61,6 +63,8 @@ pub trait Heartbeat: Outbound { self.handle_trusted_peers(); // Keep the puzzle request up to date. self.handle_puzzle_request(); + // Unban any addresses whose ban time has expired. + self.handle_banned_ips(); } /// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers. @@ -219,11 +223,19 @@ pub trait Heartbeat: Outbound { if num_deficient > 0 { // Initialize an RNG. let rng = &mut OsRng; + let banned_ips = self.tcp().banned_peers().get_banned_ips(); // Attempt to connect to more peers. - for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) { + for peer_ip in self + .router() + .candidate_peers() + .into_iter() + .filter(|peer| !banned_ips.contains(&peer.ip())) + .choose_multiple(rng, num_deficient) + { self.router().connect(peer_ip); } + if self.router().allow_external_peers() { // Request more peers from the connected peers. for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) { @@ -284,4 +296,9 @@ pub trait Heartbeat: Outbound { fn handle_puzzle_request(&self) { // No-op } + + // Remove addresses whose ban time has expired. + fn handle_banned_ips(&self) { + self.tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS); + } } diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 8d9152c744..b484aa6bb8 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -46,6 +46,8 @@ use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey}; use anyhow::{bail, Result}; use parking_lot::{Mutex, RwLock}; +#[cfg(not(any(test, feature = "test")))] +use std::net::IpAddr; use std::{ collections::{HashMap, HashSet}, future::Future, @@ -105,6 +107,12 @@ impl Router { const MAXIMUM_CANDIDATE_PEERS: usize = 10_000; /// The maximum number of connection failures permitted by an inbound connecting peer. const MAXIMUM_CONNECTION_FAILURES: usize = 5; + /// The maximum amount of connection attempts withing a 10 second threshold + #[cfg(not(any(test, feature = "test")))] + const MAX_CONNECTION_ATTEMPTS: usize = 10; + /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious. + #[cfg(not(any(test, feature = "test")))] + const MIN_CONNECTION_INTERVAL_IN_SECS: u64 = 10; /// The duration in seconds after which a connected peer is considered inactive or /// disconnected if no message has been received in the meantime. const RADIO_SILENCE_IN_SECS: u64 = 150; // 2.5 minutes @@ -426,6 +434,18 @@ impl Router { } } + /// Check whether the given IP address is currently banned. + #[cfg(not(any(test, feature = "test")))] + fn is_ip_banned(&self, ip: IpAddr) -> bool { + self.tcp.banned_peers().is_ip_banned(&ip) + } + + /// Insert or update a banned IP. + #[cfg(not(any(test, feature = "test")))] + fn update_ip_ban(&self, ip: IpAddr) { + self.tcp.banned_peers().update_ip_ban(ip); + } + /// Returns the list of metrics for the connected peers. pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> { self.connected_peers.read().iter().map(|(ip, peer)| (*ip, peer.node_type())).collect() diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index 0858547df6..a8920ed43b 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -105,8 +105,6 @@ impl> Client { // Initialize the ledger service. let ledger_service = Arc::new(CoreLedgerService::::new(ledger.clone(), shutdown.clone())); - // Initialize the sync module. - let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone()); // Determine if the client should allow external peers. let allow_external_peers = true; @@ -121,6 +119,10 @@ impl> Client { matches!(storage_mode, StorageMode::Development(_)), ) .await?; + + // Initialize the sync module. + let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone()); + // Initialize the node. let mut node = Self { ledger: ledger.clone(), diff --git a/node/src/prover/mod.rs b/node/src/prover/mod.rs index efb261146a..48bab5eaee 100644 --- a/node/src/prover/mod.rs +++ b/node/src/prover/mod.rs @@ -100,8 +100,6 @@ impl> Prover { // Initialize the ledger service. let ledger_service = Arc::new(ProverLedgerService::new()); - // Initialize the sync module. - let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone()); // Determine if the prover should allow external peers. let allow_external_peers = true; @@ -116,6 +114,10 @@ impl> Prover { matches!(storage_mode, StorageMode::Development(_)), ) .await?; + + // Initialize the sync module. + let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone()); + // Compute the maximum number of puzzle instances. let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6); // Initialize the node. diff --git a/node/src/validator/mod.rs b/node/src/validator/mod.rs index fd9a8721fe..1f9a8ac8d7 100644 --- a/node/src/validator/mod.rs +++ b/node/src/validator/mod.rs @@ -107,12 +107,10 @@ impl> Validator { // Initialize the ledger service. let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone())); - // Initialize the sync module. - let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service.clone()); // Initialize the consensus. let mut consensus = - Consensus::new(account.clone(), ledger_service, bft_ip, trusted_validators, storage_mode.clone())?; + Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?; // Initialize the primary channels. let (primary_sender, primary_receiver) = init_primary_channels::(); // Start the consensus. @@ -130,6 +128,9 @@ impl> Validator { ) .await?; + // Initialize the sync module. + let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone()); + // Initialize the node. let mut node = Self { ledger: ledger.clone(), diff --git a/node/sync/Cargo.toml b/node/sync/Cargo.toml index 486234169b..f86234db05 100644 --- a/node/sync/Cargo.toml +++ b/node/sync/Cargo.toml @@ -66,6 +66,10 @@ version = "=2.2.7" path = "locators" version = "=2.2.7" +[dependencies.snarkos-node-tcp] +path = "../tcp" +version = "=2.2.7" + [dependencies.snarkvm] workspace = true diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index d512e7478c..7543d2fbab 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -21,6 +21,7 @@ use snarkos_node_bft_ledger_service::LedgerService; use snarkos_node_router::messages::DataBlocks; use snarkos_node_sync_communication_service::CommunicationService; use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS}; +use snarkos_node_tcp::Tcp; use snarkvm::prelude::{block::Block, Network}; use anyhow::{bail, ensure, Result}; @@ -29,7 +30,7 @@ use itertools::Itertools; use parking_lot::{Mutex, RwLock}; use rand::{prelude::IteratorRandom, CryptoRng, Rng}; use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashSet}, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, @@ -90,6 +91,8 @@ pub struct BlockSync { /// This map is a linearly-increasing map of block heights to block hashes, /// updated solely from the ledger and candidate blocks (not from peers' block locators, to ensure there are no forks). canon: Arc>, + /// The TCP stack. + tcp: Tcp, /// The map of peer IP to their block locators. /// The block locators are consistent with the canonical map and every other peer's block locators. locators: Arc>>>, @@ -115,10 +118,11 @@ pub struct BlockSync { impl BlockSync { /// Initializes a new block sync module. - pub fn new(mode: BlockSyncMode, ledger: Arc>) -> Self { + pub fn new(mode: BlockSyncMode, ledger: Arc>, tcp: Tcp) -> Self { Self { mode, canon: ledger, + tcp, locators: Default::default(), common_ancestors: Default::default(), requests: Default::default(), @@ -676,6 +680,8 @@ impl BlockSync { let mut responses = self.responses.write(); // Acquire the write lock on the request timestamps map. let mut request_timestamps = self.request_timestamps.write(); + // Acquire the write lock on the locators map. + let mut locators = self.locators.write(); // Retrieve the current time. let now = Instant::now(); @@ -686,6 +692,8 @@ impl BlockSync { // Track the number of timed out block requests. let mut num_timed_out_block_requests = 0; + let mut peers_to_ban: HashSet = HashSet::new(); + // Remove timed out block requests. request_timestamps.retain(|height, timestamp| { let is_obsolete = *height < current_height; @@ -700,6 +708,17 @@ impl BlockSync { // If the request has timed out, or is obsolete, then remove it. if is_timeout || is_obsolete { trace!("Block request {height} has timed out: is_time_passed = {is_time_passed}, is_request_incomplete = {is_request_incomplete}, is_obsolete = {is_obsolete}"); + + if let Some((_, _, peer_ips)) = requests.get(height) { peer_ips.iter().for_each(|peer_ip| { + debug!("Removing peer {peer_ip} from block request {height}"); + // Remove the locators entry for the given peer IP. + locators.swap_remove(peer_ip); + if is_timeout { + peers_to_ban.insert(*peer_ip); + } + }); + } + // Remove the request entry for the given height. requests.remove(height); // Remove the response entry for the given height. @@ -711,6 +730,18 @@ impl BlockSync { !is_timeout && !is_obsolete }); + // After the retain loop, handle the banning of peers + for peer_ip in peers_to_ban { + trace!("Banning peer {peer_ip} for timing out on block requests"); + self.tcp.banned_peers().update_ip_ban(peer_ip.ip()); + + let tcp = self.tcp.clone(); + tokio::spawn(async move { + tcp.disconnect(peer_ip).await; + trace!("Peer disconnected!"); + }); + } + num_timed_out_block_requests } @@ -952,6 +983,7 @@ mod tests { use snarkvm::prelude::{Field, TestRng}; use indexmap::{indexset, IndexSet}; + use snarkos_node_tcp::Config; use snarkvm::ledger::committee::Committee; use std::net::{IpAddr, Ipv4Addr}; @@ -976,7 +1008,15 @@ mod tests { /// Returns the sync pool, with the canonical ledger initialized to the given height. fn sample_sync_at_height(height: u32) -> BlockSync { - BlockSync::::new(BlockSyncMode::Router, Arc::new(sample_ledger_service(height))) + BlockSync::::new(BlockSyncMode::Router, Arc::new(sample_ledger_service(height)), sample_tcp()) + } + + fn sample_tcp() -> Tcp { + Tcp::new(Config { + listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), + max_connections: 200, + ..Default::default() + }) } /// Checks that the sync pool (starting at genesis) returns the correct requests. diff --git a/node/tcp/src/helpers/banned_peers.rs b/node/tcp/src/helpers/banned_peers.rs index d1d3a28a90..76f5294811 100644 --- a/node/tcp/src/helpers/banned_peers.rs +++ b/node/tcp/src/helpers/banned_peers.rs @@ -16,20 +16,24 @@ use std::{collections::HashMap, net::IpAddr, time::Instant}; use parking_lot::RwLock; -use tracing::trace; /// Contains the ban details for a banned peer. +#[derive(Clone)] pub struct BanConfig { /// The time when the ban was created. banned_at: Instant, /// Amount of times the peer has been banned. - banned_count: u8, + count: u8, } impl BanConfig { /// Creates a new ban config. - pub fn new(count: u8) -> Self { - Self { banned_at: Instant::now(), banned_count: count } + pub fn new(banned_at: Instant, count: u8) -> Self { + Self { banned_at, count } + } + + pub fn update_count(&mut self) { + self.count += 1; } } @@ -45,22 +49,36 @@ impl BannedPeers { /// Get ban count for the given IP address. pub fn get_ban_count(&self, ip: IpAddr) -> Option { - self.0.read().get(&ip).map(|delay| delay.banned_count) + self.0.read().get(&ip).map(|delay| delay.count) + } + + /// Get all banned IPs. + pub fn get_banned_ips(&self) -> Vec { + self.0.read().keys().cloned().collect() + } + + /// Get ban config for the given IP address. + pub fn get_ban_config(&self, ip: IpAddr) -> Option { + self.0.read().get(&ip).cloned() } /// Insert or update a banned IP. pub fn update_ip_ban(&self, ip: IpAddr) { - let count = self.get_ban_count(ip).unwrap_or(0).saturating_add(1u8); - - trace!("Banning IP: {:?} with count: {}", ip, count); - let ban_config = BanConfig::new(count); - self.0.write().insert(ip, ban_config); + if let Some(config) = self.get_ban_config(ip) { + let mut config = config; + config.update_count(); + self.0.write().insert(ip, config); + } else { + self.0.write().insert(ip, BanConfig::new(Instant::now(), 1u8)); + } } /// Remove the expired entries pub fn remove_old_bans(&self, ban_time_in_secs: u64) { self.0.write().retain(|_, ban_config| { - ban_config.banned_at.elapsed().as_secs() < (ban_time_in_secs << ban_config.banned_count.max(32)) + let shift = ban_config.count.min(32); + let ban_duration = ban_time_in_secs.checked_shl(shift as u32).unwrap_or(u64::MAX); + ban_config.banned_at.elapsed().as_secs() < ban_duration }); } } From bd63d832655ad6110f108e2211abbd961d27843e Mon Sep 17 00:00:00 2001 From: zkxuerb Date: Mon, 21 Oct 2024 21:11:50 +0200 Subject: [PATCH 3/5] removes exponential ban and ban on re connect --- node/bft/src/gateway.rs | 1 - node/router/src/handshake.rs | 1 - node/tcp/src/helpers/banned_peers.rs | 26 ++++---------------------- 3 files changed, 4 insertions(+), 24 deletions(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index b72fb74f64..265cf7928b 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -1142,7 +1142,6 @@ impl Handshake for Gateway { if self.dev().is_none() && peer_side == ConnectionSide::Initiator { // If the IP is already banned, update the ban timestamp and reject the connection. if self.is_ip_banned(peer_addr.ip()) { - self.update_ip_ban(peer_addr.ip()); trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip()); return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); } diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index f412227705..5315d49bc7 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -106,7 +106,6 @@ impl Router { if !self.is_dev() && peer_side == ConnectionSide::Initiator { // If the IP is already banned, update the ban timestamp and reject the connection. if self.is_ip_banned(peer_addr.ip()) { - self.update_ip_ban(peer_addr.ip()); trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip()); return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); } diff --git a/node/tcp/src/helpers/banned_peers.rs b/node/tcp/src/helpers/banned_peers.rs index 76f5294811..7f1aceb5ca 100644 --- a/node/tcp/src/helpers/banned_peers.rs +++ b/node/tcp/src/helpers/banned_peers.rs @@ -22,19 +22,14 @@ use parking_lot::RwLock; pub struct BanConfig { /// The time when the ban was created. banned_at: Instant, - /// Amount of times the peer has been banned. - count: u8, } impl BanConfig { /// Creates a new ban config. - pub fn new(banned_at: Instant, count: u8) -> Self { - Self { banned_at, count } + pub fn new(banned_at: Instant) -> Self { + Self { banned_at } } - pub fn update_count(&mut self) { - self.count += 1; - } } /// Contains the set of peers currently banned by IP. @@ -47,11 +42,6 @@ impl BannedPeers { self.0.read().contains_key(ip) } - /// Get ban count for the given IP address. - pub fn get_ban_count(&self, ip: IpAddr) -> Option { - self.0.read().get(&ip).map(|delay| delay.count) - } - /// Get all banned IPs. pub fn get_banned_ips(&self) -> Vec { self.0.read().keys().cloned().collect() @@ -64,21 +54,13 @@ impl BannedPeers { /// Insert or update a banned IP. pub fn update_ip_ban(&self, ip: IpAddr) { - if let Some(config) = self.get_ban_config(ip) { - let mut config = config; - config.update_count(); - self.0.write().insert(ip, config); - } else { - self.0.write().insert(ip, BanConfig::new(Instant::now(), 1u8)); - } + self.0.write().insert(ip, BanConfig::new(Instant::now())); } /// Remove the expired entries pub fn remove_old_bans(&self, ban_time_in_secs: u64) { self.0.write().retain(|_, ban_config| { - let shift = ban_config.count.min(32); - let ban_duration = ban_time_in_secs.checked_shl(shift as u32).unwrap_or(u64::MAX); - ban_config.banned_at.elapsed().as_secs() < ban_duration + ban_config.banned_at.elapsed().as_secs() < ban_time_in_secs }); } } From 846244db7ec750e07986467dfc4b665fdf9fad60 Mon Sep 17 00:00:00 2001 From: zkxuerb Date: Wed, 23 Oct 2024 23:21:01 +0200 Subject: [PATCH 4/5] remove connection attempt timestamp check --- node/bft/src/gateway.rs | 24 ++++++++++-------------- node/router/src/handshake.rs | 23 +++++++++-------------- node/router/src/lib.rs | 2 +- 3 files changed, 20 insertions(+), 29 deletions(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 265cf7928b..362f010d18 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -93,7 +93,7 @@ const MAX_VALIDATORS_TO_SEND: usize = 200; /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious. #[cfg(not(any(test, feature = "test")))] -const MIN_CONNECTION_INTERVAL_IN_SECS: u64 = 10; +const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10; /// The amount of time an IP address is prohibited from connecting. const IP_BAN_TIME_IN_SECS: u64 = 300; @@ -1140,26 +1140,22 @@ impl Handshake for Gateway { // Check (or impose) IP-level bans. #[cfg(not(any(test, feature = "test")))] if self.dev().is_none() && peer_side == ConnectionSide::Initiator { - // If the IP is already banned, update the ban timestamp and reject the connection. + // If the IP is already banned reject the connection. if self.is_ip_banned(peer_addr.ip()) { trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip()); return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); } - // Check the previous low-level connection timestamp. - if let Some(peer_stats) = self.tcp.known_peers().get(peer_addr.ip()) { - let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), 10); + let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS); - debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts); - debug!("Seconds since connection attempt: {}", peer_stats.timestamp().elapsed().as_secs()); - if peer_stats.timestamp().elapsed().as_secs() <= MIN_CONNECTION_INTERVAL_IN_SECS - && num_attempts >= MAX_CONNECTION_ATTEMPTS - { - self.update_ip_ban(peer_addr.ip()); - trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip()); - return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); - } + debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts); + if num_attempts >= MAX_CONNECTION_ATTEMPTS + { + self.update_ip_ban(peer_addr.ip()); + trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip()); + return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); } + } let stream = self.borrow_stream(&mut connection); diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index 5315d49bc7..54df55e78e 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -104,25 +104,20 @@ impl Router { // Check (or impose) IP-level bans. #[cfg(not(any(test, feature = "test")))] if !self.is_dev() && peer_side == ConnectionSide::Initiator { - // If the IP is already banned, update the ban timestamp and reject the connection. + // If the IP is already banned reject the connection. if self.is_ip_banned(peer_addr.ip()) { trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip()); return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); } - // Check the previous low-level connection timestamp. - if let Some(peer_stats) = self.tcp.known_peers().get(peer_addr.ip()) { - let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), 10); - - debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts); - debug!("Seconds since connection attempt: {}", peer_stats.timestamp().elapsed().as_secs()); - if peer_stats.timestamp().elapsed().as_secs() <= Router::::MIN_CONNECTION_INTERVAL_IN_SECS - && num_attempts >= Router::::MAX_CONNECTION_ATTEMPTS - { - self.update_ip_ban(peer_addr.ip()); - trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip()); - return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); - } + let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), Router::::CONNECTION_ATTEMPTS_SINCE_SECS); + + debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts); + if num_attempts >= Router::::MAX_CONNECTION_ATTEMPTS + { + self.update_ip_ban(peer_addr.ip()); + trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip()); + return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); } } diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index b484aa6bb8..78aa1f5a26 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -112,7 +112,7 @@ impl Router { const MAX_CONNECTION_ATTEMPTS: usize = 10; /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious. #[cfg(not(any(test, feature = "test")))] - const MIN_CONNECTION_INTERVAL_IN_SECS: u64 = 10; + const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10; /// The duration in seconds after which a connected peer is considered inactive or /// disconnected if no message has been received in the meantime. const RADIO_SILENCE_IN_SECS: u64 = 150; // 2.5 minutes From 2bdc60b8c18d09318ac445f7c9fc7abc6557daa7 Mon Sep 17 00:00:00 2001 From: zkxuerb Date: Fri, 1 Nov 2024 15:17:44 +0100 Subject: [PATCH 5/5] resolved nits --- node/bft/src/gateway.rs | 4 +--- node/bft/src/sync/mod.rs | 2 +- node/router/src/handshake.rs | 6 +++--- node/router/src/heartbeat.rs | 11 ++--------- node/router/src/lib.rs | 12 +++++++----- node/sync/Cargo.toml | 2 +- node/sync/src/block_sync.rs | 13 ++++++------- node/tcp/src/helpers/banned_peers.rs | 25 ++++++++++++++----------- node/tcp/src/helpers/known_peers.rs | 2 +- node/tcp/src/tcp.rs | 2 -- 10 files changed, 36 insertions(+), 43 deletions(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index abccb19f81..cf6a909d6b 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -1165,13 +1165,11 @@ impl Handshake for Gateway { let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS); debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts); - if num_attempts >= MAX_CONNECTION_ATTEMPTS - { + if num_attempts > MAX_CONNECTION_ATTEMPTS { self.update_ip_ban(peer_addr.ip()); trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip()); return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); } - } let stream = self.borrow_stream(&mut connection); diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index a75f0893ff..9f4642124c 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -23,7 +23,7 @@ use crate::{ }; use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event}; use snarkos_node_bft_ledger_service::LedgerService; -use snarkos_node_sync::{locators::BlockLocators, BlockSync, BlockSyncMode}; +use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators}; use snarkos_node_tcp::P2P; use snarkvm::{ console::{network::Network, types::Field}, diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index 97d8410761..7c59e017f4 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -110,11 +110,11 @@ impl Router { return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); } - let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), Router::::CONNECTION_ATTEMPTS_SINCE_SECS); + let num_attempts = + self.cache.insert_inbound_connection(peer_addr.ip(), Router::::CONNECTION_ATTEMPTS_SINCE_SECS); debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts); - if num_attempts >= Router::::MAX_CONNECTION_ATTEMPTS - { + if num_attempts > Router::::MAX_CONNECTION_ATTEMPTS { self.update_ip_ban(peer_addr.ip()); trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip()); return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); diff --git a/node/router/src/heartbeat.rs b/node/router/src/heartbeat.rs index 061057f200..a1ce8296a9 100644 --- a/node/router/src/heartbeat.rs +++ b/node/router/src/heartbeat.rs @@ -21,7 +21,7 @@ use crate::{ use snarkvm::prelude::Network; use colored::Colorize; -use rand::{prelude::IteratorRandom, rngs::OsRng, Rng}; +use rand::{Rng, prelude::IteratorRandom, rngs::OsRng}; /// A helper function to compute the maximum of two numbers. /// See Rust issue 92391: https://github.com/rust-lang/rust/issues/92391. @@ -237,16 +237,9 @@ pub trait Heartbeat: Outbound { if num_deficient > 0 { // Initialize an RNG. let rng = &mut OsRng; - let banned_ips = self.tcp().banned_peers().get_banned_ips(); // Attempt to connect to more peers. - for peer_ip in self - .router() - .candidate_peers() - .into_iter() - .filter(|peer| !banned_ips.contains(&peer.ip())) - .choose_multiple(rng, num_deficient) - { + for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) { self.router().connect(peer_ip); } diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index a91df5911b..0f22a61ad6 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -41,7 +41,8 @@ pub use routing::*; use crate::messages::NodeType; use snarkos_account::Account; -use snarkos_node_tcp::{Config, Tcp, is_bogon_ip, is_unspecified_or_broadcast_ip}; +use snarkos_node_tcp::{Config, P2P, Tcp, is_bogon_ip, is_unspecified_or_broadcast_ip}; + use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey}; use anyhow::{Result, bail}; @@ -105,6 +106,9 @@ pub struct InnerRouter { } impl Router { + /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious. + #[cfg(not(any(test, feature = "test")))] + const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10; /// The maximum number of candidate peers permitted to be stored in the node. const MAXIMUM_CANDIDATE_PEERS: usize = 10_000; /// The maximum number of connection failures permitted by an inbound connecting peer. @@ -112,9 +116,6 @@ impl Router { /// The maximum amount of connection attempts withing a 10 second threshold #[cfg(not(any(test, feature = "test")))] const MAX_CONNECTION_ATTEMPTS: usize = 10; - /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious. - #[cfg(not(any(test, feature = "test")))] - const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10; /// The duration in seconds after which a connected peer is considered inactive or /// disconnected if no message has been received in the meantime. const RADIO_SILENCE_IN_SECS: u64 = 150; // 2.5 minutes @@ -395,7 +396,8 @@ impl Router { /// Returns the list of candidate peers. pub fn candidate_peers(&self) -> HashSet { - self.candidate_peers.read().clone() + let banned_ips = self.tcp().banned_peers().get_banned_ips(); + self.candidate_peers.read().iter().filter(|peer| !banned_ips.contains(&peer.ip())).copied().collect() } /// Returns the list of restricted peers. diff --git a/node/sync/Cargo.toml b/node/sync/Cargo.toml index 9b3caac48f..206c6b00be 100644 --- a/node/sync/Cargo.toml +++ b/node/sync/Cargo.toml @@ -68,7 +68,7 @@ version = "=3.0.0" [dependencies.snarkos-node-tcp] path = "../tcp" -version = "=2.2.7" +version = "=3.0.0" [dependencies.snarkvm] workspace = true diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index f78295089c..00b405c9e9 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -22,7 +22,7 @@ use snarkos_node_router::messages::DataBlocks; use snarkos_node_sync_communication_service::CommunicationService; use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS}; use snarkos_node_tcp::Tcp; -use snarkvm::prelude::{block::Block, Network}; +use snarkvm::prelude::{Network, block::Block}; use anyhow::{Result, bail, ensure}; use indexmap::{IndexMap, IndexSet}; @@ -30,7 +30,7 @@ use itertools::Itertools; use parking_lot::{Mutex, RwLock}; use rand::{CryptoRng, Rng, prelude::IteratorRandom}; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ Arc, @@ -95,7 +95,7 @@ pub struct BlockSync { tcp: Tcp, /// The map of peer IP to their block locators. /// The block locators are consistent with the canonical map and every other peer's block locators. - locators: Arc>>>, + locators: Arc>>>, /// The map of peer-to-peer to their common ancestor. /// This map is used to determine which peers to request blocks from. common_ancestors: Arc>>, @@ -447,7 +447,7 @@ impl BlockSync { /// Removes the peer from the sync pool, if they exist. pub fn remove_peer(&self, peer_ip: &SocketAddr) { // Remove the locators entry for the given peer IP. - self.locators.write().swap_remove(peer_ip); + self.locators.write().remove(peer_ip); // Remove all block requests to the peer. self.remove_block_requests_to_peer(peer_ip); } @@ -712,7 +712,7 @@ impl BlockSync { if let Some((_, _, peer_ips)) = requests.get(height) { peer_ips.iter().for_each(|peer_ip| { debug!("Removing peer {peer_ip} from block request {height}"); // Remove the locators entry for the given peer IP. - locators.swap_remove(peer_ip); + locators.remove(peer_ip); if is_timeout { peers_to_ban.insert(*peer_ip); } @@ -738,7 +738,6 @@ impl BlockSync { let tcp = self.tcp.clone(); tokio::spawn(async move { tcp.disconnect(peer_ip).await; - trace!("Peer disconnected!"); }); } @@ -982,7 +981,7 @@ mod tests { use snarkos_node_bft_ledger_service::MockLedgerService; use snarkvm::prelude::{Field, TestRng}; - use indexmap::{indexset, IndexSet}; + use indexmap::{IndexSet, indexset}; use snarkos_node_tcp::Config; use snarkvm::ledger::committee::Committee; use std::net::{IpAddr, Ipv4Addr}; diff --git a/node/tcp/src/helpers/banned_peers.rs b/node/tcp/src/helpers/banned_peers.rs index 7f1aceb5ca..466013bc44 100644 --- a/node/tcp/src/helpers/banned_peers.rs +++ b/node/tcp/src/helpers/banned_peers.rs @@ -19,22 +19,27 @@ use parking_lot::RwLock; /// Contains the ban details for a banned peer. #[derive(Clone)] -pub struct BanConfig { +pub struct BanDetails { /// The time when the ban was created. banned_at: Instant, } -impl BanConfig { - /// Creates a new ban config. - pub fn new(banned_at: Instant) -> Self { - Self { banned_at } +impl BanDetails { + /// Creates a new ban at the given time. + pub fn new() -> Self { + Self { banned_at: Instant::now() } } +} +impl Default for BanDetails { + fn default() -> Self { + Self::new() + } } /// Contains the set of peers currently banned by IP. #[derive(Default)] -pub struct BannedPeers(RwLock>); +pub struct BannedPeers(RwLock>); impl BannedPeers { /// Check whether the given IP address is currently banned. @@ -48,19 +53,17 @@ impl BannedPeers { } /// Get ban config for the given IP address. - pub fn get_ban_config(&self, ip: IpAddr) -> Option { + pub fn get_ban_config(&self, ip: IpAddr) -> Option { self.0.read().get(&ip).cloned() } /// Insert or update a banned IP. pub fn update_ip_ban(&self, ip: IpAddr) { - self.0.write().insert(ip, BanConfig::new(Instant::now())); + self.0.write().insert(ip, BanDetails::default()); } /// Remove the expired entries pub fn remove_old_bans(&self, ban_time_in_secs: u64) { - self.0.write().retain(|_, ban_config| { - ban_config.banned_at.elapsed().as_secs() < ban_time_in_secs - }); + self.0.write().retain(|_, ban_config| ban_config.banned_at.elapsed().as_secs() < ban_time_in_secs); } } diff --git a/node/tcp/src/helpers/known_peers.rs b/node/tcp/src/helpers/known_peers.rs index a50241b01e..0d9264559f 100644 --- a/node/tcp/src/helpers/known_peers.rs +++ b/node/tcp/src/helpers/known_peers.rs @@ -14,7 +14,7 @@ // limitations under the License. use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{HashMap, hash_map::Entry}, net::IpAddr, sync::Arc, time::Instant, diff --git a/node/tcp/src/tcp.rs b/node/tcp/src/tcp.rs index 474d5505d9..a0a55820dc 100644 --- a/node/tcp/src/tcp.rs +++ b/node/tcp/src/tcp.rs @@ -38,8 +38,6 @@ use tokio::{ use tracing::*; use crate::{ - connections::{Connection, ConnectionSide, Connections}, - protocols::{Protocol, Protocols}, BannedPeers, Config, KnownPeers,