From 735935aa2c63f18e4a391956173c5c3a00f6af66 Mon Sep 17 00:00:00 2001 From: tim gretler Date: Fri, 27 Sep 2024 10:52:58 +0200 Subject: [PATCH] feat: Introduce p2p slot table limit and limit allowed ingress slots per peer (#1213) To support hashes in blocks there are modifications #1061 to the ingress pool that would allow the unvalidated ingress pool to grow without bounds. P2P allows for bounding the unvalidated pool via the slot table. Until now the slot table didn't have any bounds and this MR introduces a config option to bound the slot table and introduces a hard bound for the ingress pool of `50000`. Other clients use `usize::MAX` as a bound. --- rs/p2p/consensus_manager/src/lib.rs | 4 ++ rs/p2p/consensus_manager/src/metrics.rs | 9 +++ rs/p2p/consensus_manager/src/receiver.rs | 77 +++++++++++++++++++++--- rs/p2p/test_utils/src/lib.rs | 7 ++- rs/p2p/test_utils/src/turmoil.rs | 1 + rs/replica/setup_ic_network/src/lib.rs | 21 +++++-- 6 files changed, 105 insertions(+), 14 deletions(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 1370f4eccdd..ea5189dba8a 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -57,6 +57,7 @@ impl ConsensusManagerBuilder { outbound_artifacts_rx: Receiver>, inbound_artifacts_tx: UnboundedSender>, (assembler, assembler_router): (F, Router), + slot_limit: usize, ) { assert!(uri_prefix::() .chars() @@ -78,6 +79,7 @@ impl ConsensusManagerBuilder { assembler(transport.clone()), transport, topology_watcher, + slot_limit, ) }; @@ -121,6 +123,7 @@ fn start_consensus_manager( assembler: Assembler, transport: Arc, topology_watcher: watch::Receiver, + slot_limit: usize, ) -> Vec where Artifact: IdentifiableArtifact, @@ -146,6 +149,7 @@ where assembler, sender, topology_watcher, + slot_limit, ); vec![shutdown_send_side, shutdown_receive_side] } diff --git a/rs/p2p/consensus_manager/src/metrics.rs b/rs/p2p/consensus_manager/src/metrics.rs index 15185ec2231..3acf3c33aad 100644 --- a/rs/p2p/consensus_manager/src/metrics.rs +++ b/rs/p2p/consensus_manager/src/metrics.rs @@ -22,6 +22,7 @@ pub(crate) struct ConsensusManagerMetrics { // Slot table pub slot_table_updates_total: IntCounter, pub slot_table_updates_with_artifact_total: IntCounter, + pub slot_table_limit_exceeded_total: IntCounter, pub slot_table_overwrite_total: IntCounter, pub slot_table_stale_total: IntCounter, pub slot_table_new_entry_total: IntCounterVec, @@ -105,6 +106,14 @@ impl ConsensusManagerMetrics { )) .unwrap(), ), + slot_table_limit_exceeded_total: metrics_registry.register( + IntCounter::with_opts(opts!( + "ic_consensus_manager_slot_table_limit_exceeded_total", + "Slot updates that would exceed slot table limit.", + const_labels.clone(), + )) + .unwrap(), + ), slot_table_updates_with_artifact_total: metrics_registry.register( IntCounter::with_opts(opts!( "ic_consensus_manager_slot_table_updates_with_artifact_total", diff --git a/rs/p2p/consensus_manager/src/receiver.rs b/rs/p2p/consensus_manager/src/receiver.rs index 34894cc2211..a4aca8cd80d 100644 --- a/rs/p2p/consensus_manager/src/receiver.rs +++ b/rs/p2p/consensus_manager/src/receiver.rs @@ -197,6 +197,8 @@ pub(crate) struct ConsensusManagerReceiver< artifact_processor_tasks: JoinSet<(watch::Receiver, WireArtifact::Id)>, topology_watcher: watch::Receiver, + + slot_limit: usize, } #[allow(unused)] @@ -220,6 +222,7 @@ where artifact_assembler: Assembler, sender: UnboundedSender>, topology_watcher: watch::Receiver, + slot_limit: usize, ) -> Shutdown { let receive_manager = Self { log, @@ -232,6 +235,7 @@ where slot_table: HashMap::new(), artifact_processor_tasks: JoinSet::new(), topology_watcher, + slot_limit, }; Shutdown::spawn_on_with_cancellation( @@ -363,12 +367,9 @@ where id: id.clone(), }; - let (to_add, to_remove) = match self - .slot_table - .entry(peer_id) - .or_default() - .entry(slot_number) - { + let peer_slot_table = self.slot_table.entry(peer_id).or_default(); + let peer_slot_table_len = peer_slot_table.len(); + let (to_add, to_remove) = match peer_slot_table.entry(slot_number) { Entry::Occupied(mut slot_entry_mut) => { if slot_entry_mut.get().should_be_replaced(&new_slot_entry) { self.metrics.slot_table_overwrite_total.inc(); @@ -379,7 +380,8 @@ where (false, None) } } - Entry::Vacant(empty_slot) => { + // Only insert slot update if we are below peer slot table limit. + Entry::Vacant(empty_slot) if peer_slot_table_len < self.slot_limit => { empty_slot.insert(new_slot_entry); self.metrics .slot_table_new_entry_total @@ -387,6 +389,16 @@ where .inc(); (true, None) } + Entry::Vacant(_) => { + self.metrics.slot_table_limit_exceeded_total.inc(); + warn!( + self.log, + "Peer {} tries to exceed slot limit {}. Dropping slot update", + peer_id, + self.slot_limit + ); + (false, None) + } }; if to_add { @@ -607,6 +619,7 @@ mod tests { sender: UnboundedSender>, artifact_assembler: MockArtifactAssembler, topology_watcher: watch::Receiver, + slot_limit: usize, channels: Channels, } @@ -645,6 +658,7 @@ mod tests { sender, topology_watcher, artifact_assembler, + slot_limit: usize::MAX, channels: Channels { unvalidated_artifact_receiver, }, @@ -659,6 +673,11 @@ mod tests { self } + fn with_slot_limit(mut self, slot_limit: usize) -> Self { + self.slot_limit = slot_limit; + self + } + fn with_artifact_assembler_maker( mut self, make_mock: fn() -> MockArtifactAssembler, @@ -682,6 +701,7 @@ mod tests { active_assembles: HashMap::new(), slot_table: HashMap::new(), artifact_processor_tasks: JoinSet::new(), + slot_limit: self.slot_limit, }); (consensus_manager_receiver, self.channels) @@ -822,6 +842,49 @@ mod tests { assert_eq!(mgr.artifact_processor_tasks.len(), 1); } + #[tokio::test] + async fn slot_table_limit_exceeded() { + let (mut mgr, _channels) = ReceiverManagerBuilder::new().with_slot_limit(2).build(); + let cancellation = CancellationToken::new(); + + mgr.handle_advert_receive( + SlotUpdate { + slot_number: SlotNumber::from(1), + commit_id: CommitId::from(1), + update: Update::Id(0), + }, + NODE_1, + ConnId::from(1), + cancellation.clone(), + ); + mgr.handle_advert_receive( + SlotUpdate { + slot_number: SlotNumber::from(2), + commit_id: CommitId::from(2), + update: Update::Id(1), + }, + NODE_1, + ConnId::from(1), + cancellation.clone(), + ); + assert_eq!(mgr.slot_table.len(), 1); + assert_eq!(mgr.slot_table.get(&NODE_1).unwrap().len(), 2); + assert_eq!(mgr.active_assembles.len(), 2); + // Send slot update that exceeds limit + mgr.handle_advert_receive( + SlotUpdate { + slot_number: SlotNumber::from(3), + commit_id: CommitId::from(3), + update: Update::Id(2), + }, + NODE_1, + ConnId::from(1), + cancellation.clone(), + ); + assert_eq!(mgr.slot_table.get(&NODE_1).unwrap().len(), 2); + assert_eq!(mgr.active_assembles.len(), 2); + } + /// Check that adverts updates with higher connection ids take precedence. #[tokio::test] async fn overwrite_slot1() { diff --git a/rs/p2p/test_utils/src/lib.rs b/rs/p2p/test_utils/src/lib.rs index 5e544249b32..7d749c41c3d 100644 --- a/rs/p2p/test_utils/src/lib.rs +++ b/rs/p2p/test_utils/src/lib.rs @@ -468,7 +468,12 @@ pub fn start_consensus_manager( bouncer_factory, MetricsRegistry::default(), ); - cm1.add_client(artifact_manager_event_rx, artifact_sender, downloader); + cm1.add_client( + artifact_manager_event_rx, + artifact_sender, + downloader, + usize::MAX, + ); (artifact_processor_jh, cm1) } diff --git a/rs/p2p/test_utils/src/turmoil.rs b/rs/p2p/test_utils/src/turmoil.rs index a8287a78337..3096d4453ab 100644 --- a/rs/p2p/test_utils/src/turmoil.rs +++ b/rs/p2p/test_utils/src/turmoil.rs @@ -388,6 +388,7 @@ pub fn add_transport_to_sim( artifact_manager_event_rx, artifact_sender, downloader, + usize::MAX, ); router = Some(router.unwrap_or_default().merge(consensus_builder.router())); diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index db99ad43a9c..a90265aea14 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -61,6 +61,10 @@ use tokio::sync::{mpsc::UnboundedSender, watch}; use tower_http::trace::TraceLayer; pub const MAX_ADVERT_BUFFER: usize = 100_000; +/// This limit is used to protect against a malicious peer advertising many ingress messages. +/// If no malicious peers are present the ingress pools are bounded by a separate limit. +const SLOT_TABLE_LIMIT_INGRESS: usize = 50_000; +const SLOT_TABLE_NO_LIMIT: usize = usize::MAX; /// The collection of all artifact pools. struct ArtifactPools { @@ -345,7 +349,7 @@ fn start_consensus( bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(consensus_rx, client, assembler); + new_p2p_consensus.add_client(consensus_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; let ingress_sender = { @@ -369,7 +373,12 @@ fn start_consensus( metrics_registry.clone(), ); - new_p2p_consensus.add_client(ingress_rx, client.clone(), assembler); + new_p2p_consensus.add_client( + ingress_rx, + client.clone(), + assembler, + SLOT_TABLE_LIMIT_INGRESS, + ); client }; @@ -403,7 +412,7 @@ fn start_consensus( Arc::new(bouncer), metrics_registry.clone(), ); - new_p2p_consensus.add_client(certification_rx, client, assembler); + new_p2p_consensus.add_client(certification_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; { @@ -432,7 +441,7 @@ fn start_consensus( bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(dkg_rx, client, assembler); + new_p2p_consensus.add_client(dkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; { @@ -480,7 +489,7 @@ fn start_consensus( bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(idkg_rx, client, assembler); + new_p2p_consensus.add_client(idkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; { @@ -514,7 +523,7 @@ fn start_consensus( bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(http_outcalls_rx, client, assembler); + new_p2p_consensus.add_client(http_outcalls_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; (