diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 1370f4eccdd..ea5189dba8a 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -57,6 +57,7 @@ impl ConsensusManagerBuilder { outbound_artifacts_rx: Receiver>, inbound_artifacts_tx: UnboundedSender>, (assembler, assembler_router): (F, Router), + slot_limit: usize, ) { assert!(uri_prefix::() .chars() @@ -78,6 +79,7 @@ impl ConsensusManagerBuilder { assembler(transport.clone()), transport, topology_watcher, + slot_limit, ) }; @@ -121,6 +123,7 @@ fn start_consensus_manager( assembler: Assembler, transport: Arc, topology_watcher: watch::Receiver, + slot_limit: usize, ) -> Vec where Artifact: IdentifiableArtifact, @@ -146,6 +149,7 @@ where assembler, sender, topology_watcher, + slot_limit, ); vec![shutdown_send_side, shutdown_receive_side] } diff --git a/rs/p2p/consensus_manager/src/metrics.rs b/rs/p2p/consensus_manager/src/metrics.rs index 15185ec2231..3acf3c33aad 100644 --- a/rs/p2p/consensus_manager/src/metrics.rs +++ b/rs/p2p/consensus_manager/src/metrics.rs @@ -22,6 +22,7 @@ pub(crate) struct ConsensusManagerMetrics { // Slot table pub slot_table_updates_total: IntCounter, pub slot_table_updates_with_artifact_total: IntCounter, + pub slot_table_limit_exceeded_total: IntCounter, pub slot_table_overwrite_total: IntCounter, pub slot_table_stale_total: IntCounter, pub slot_table_new_entry_total: IntCounterVec, @@ -105,6 +106,14 @@ impl ConsensusManagerMetrics { )) .unwrap(), ), + slot_table_limit_exceeded_total: metrics_registry.register( + IntCounter::with_opts(opts!( + "ic_consensus_manager_slot_table_limit_exceeded_total", + "Slot updates that would exceed slot table limit.", + const_labels.clone(), + )) + .unwrap(), + ), slot_table_updates_with_artifact_total: metrics_registry.register( IntCounter::with_opts(opts!( "ic_consensus_manager_slot_table_updates_with_artifact_total", diff --git a/rs/p2p/consensus_manager/src/receiver.rs b/rs/p2p/consensus_manager/src/receiver.rs index 34894cc2211..a4aca8cd80d 100644 --- a/rs/p2p/consensus_manager/src/receiver.rs +++ b/rs/p2p/consensus_manager/src/receiver.rs @@ -197,6 +197,8 @@ pub(crate) struct ConsensusManagerReceiver< artifact_processor_tasks: JoinSet<(watch::Receiver, WireArtifact::Id)>, topology_watcher: watch::Receiver, + + slot_limit: usize, } #[allow(unused)] @@ -220,6 +222,7 @@ where artifact_assembler: Assembler, sender: UnboundedSender>, topology_watcher: watch::Receiver, + slot_limit: usize, ) -> Shutdown { let receive_manager = Self { log, @@ -232,6 +235,7 @@ where slot_table: HashMap::new(), artifact_processor_tasks: JoinSet::new(), topology_watcher, + slot_limit, }; Shutdown::spawn_on_with_cancellation( @@ -363,12 +367,9 @@ where id: id.clone(), }; - let (to_add, to_remove) = match self - .slot_table - .entry(peer_id) - .or_default() - .entry(slot_number) - { + let peer_slot_table = self.slot_table.entry(peer_id).or_default(); + let peer_slot_table_len = peer_slot_table.len(); + let (to_add, to_remove) = match peer_slot_table.entry(slot_number) { Entry::Occupied(mut slot_entry_mut) => { if slot_entry_mut.get().should_be_replaced(&new_slot_entry) { self.metrics.slot_table_overwrite_total.inc(); @@ -379,7 +380,8 @@ where (false, None) } } - Entry::Vacant(empty_slot) => { + // Only insert slot update if we are below peer slot table limit. + Entry::Vacant(empty_slot) if peer_slot_table_len < self.slot_limit => { empty_slot.insert(new_slot_entry); self.metrics .slot_table_new_entry_total @@ -387,6 +389,16 @@ where .inc(); (true, None) } + Entry::Vacant(_) => { + self.metrics.slot_table_limit_exceeded_total.inc(); + warn!( + self.log, + "Peer {} tries to exceed slot limit {}. Dropping slot update", + peer_id, + self.slot_limit + ); + (false, None) + } }; if to_add { @@ -607,6 +619,7 @@ mod tests { sender: UnboundedSender>, artifact_assembler: MockArtifactAssembler, topology_watcher: watch::Receiver, + slot_limit: usize, channels: Channels, } @@ -645,6 +658,7 @@ mod tests { sender, topology_watcher, artifact_assembler, + slot_limit: usize::MAX, channels: Channels { unvalidated_artifact_receiver, }, @@ -659,6 +673,11 @@ mod tests { self } + fn with_slot_limit(mut self, slot_limit: usize) -> Self { + self.slot_limit = slot_limit; + self + } + fn with_artifact_assembler_maker( mut self, make_mock: fn() -> MockArtifactAssembler, @@ -682,6 +701,7 @@ mod tests { active_assembles: HashMap::new(), slot_table: HashMap::new(), artifact_processor_tasks: JoinSet::new(), + slot_limit: self.slot_limit, }); (consensus_manager_receiver, self.channels) @@ -822,6 +842,49 @@ mod tests { assert_eq!(mgr.artifact_processor_tasks.len(), 1); } + #[tokio::test] + async fn slot_table_limit_exceeded() { + let (mut mgr, _channels) = ReceiverManagerBuilder::new().with_slot_limit(2).build(); + let cancellation = CancellationToken::new(); + + mgr.handle_advert_receive( + SlotUpdate { + slot_number: SlotNumber::from(1), + commit_id: CommitId::from(1), + update: Update::Id(0), + }, + NODE_1, + ConnId::from(1), + cancellation.clone(), + ); + mgr.handle_advert_receive( + SlotUpdate { + slot_number: SlotNumber::from(2), + commit_id: CommitId::from(2), + update: Update::Id(1), + }, + NODE_1, + ConnId::from(1), + cancellation.clone(), + ); + assert_eq!(mgr.slot_table.len(), 1); + assert_eq!(mgr.slot_table.get(&NODE_1).unwrap().len(), 2); + assert_eq!(mgr.active_assembles.len(), 2); + // Send slot update that exceeds limit + mgr.handle_advert_receive( + SlotUpdate { + slot_number: SlotNumber::from(3), + commit_id: CommitId::from(3), + update: Update::Id(2), + }, + NODE_1, + ConnId::from(1), + cancellation.clone(), + ); + assert_eq!(mgr.slot_table.get(&NODE_1).unwrap().len(), 2); + assert_eq!(mgr.active_assembles.len(), 2); + } + /// Check that adverts updates with higher connection ids take precedence. #[tokio::test] async fn overwrite_slot1() { diff --git a/rs/p2p/test_utils/src/lib.rs b/rs/p2p/test_utils/src/lib.rs index 5e544249b32..7d749c41c3d 100644 --- a/rs/p2p/test_utils/src/lib.rs +++ b/rs/p2p/test_utils/src/lib.rs @@ -468,7 +468,12 @@ pub fn start_consensus_manager( bouncer_factory, MetricsRegistry::default(), ); - cm1.add_client(artifact_manager_event_rx, artifact_sender, downloader); + cm1.add_client( + artifact_manager_event_rx, + artifact_sender, + downloader, + usize::MAX, + ); (artifact_processor_jh, cm1) } diff --git a/rs/p2p/test_utils/src/turmoil.rs b/rs/p2p/test_utils/src/turmoil.rs index a8287a78337..3096d4453ab 100644 --- a/rs/p2p/test_utils/src/turmoil.rs +++ b/rs/p2p/test_utils/src/turmoil.rs @@ -388,6 +388,7 @@ pub fn add_transport_to_sim( artifact_manager_event_rx, artifact_sender, downloader, + usize::MAX, ); router = Some(router.unwrap_or_default().merge(consensus_builder.router())); diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index db99ad43a9c..a90265aea14 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -61,6 +61,10 @@ use tokio::sync::{mpsc::UnboundedSender, watch}; use tower_http::trace::TraceLayer; pub const MAX_ADVERT_BUFFER: usize = 100_000; +/// This limit is used to protect against a malicious peer advertising many ingress messages. +/// If no malicious peers are present the ingress pools are bounded by a separate limit. +const SLOT_TABLE_LIMIT_INGRESS: usize = 50_000; +const SLOT_TABLE_NO_LIMIT: usize = usize::MAX; /// The collection of all artifact pools. struct ArtifactPools { @@ -345,7 +349,7 @@ fn start_consensus( bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(consensus_rx, client, assembler); + new_p2p_consensus.add_client(consensus_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; let ingress_sender = { @@ -369,7 +373,12 @@ fn start_consensus( metrics_registry.clone(), ); - new_p2p_consensus.add_client(ingress_rx, client.clone(), assembler); + new_p2p_consensus.add_client( + ingress_rx, + client.clone(), + assembler, + SLOT_TABLE_LIMIT_INGRESS, + ); client }; @@ -403,7 +412,7 @@ fn start_consensus( Arc::new(bouncer), metrics_registry.clone(), ); - new_p2p_consensus.add_client(certification_rx, client, assembler); + new_p2p_consensus.add_client(certification_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; { @@ -432,7 +441,7 @@ fn start_consensus( bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(dkg_rx, client, assembler); + new_p2p_consensus.add_client(dkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; { @@ -480,7 +489,7 @@ fn start_consensus( bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(idkg_rx, client, assembler); + new_p2p_consensus.add_client(idkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; { @@ -514,7 +523,7 @@ fn start_consensus( bouncer, metrics_registry.clone(), ); - new_p2p_consensus.add_client(http_outcalls_rx, client, assembler); + new_p2p_consensus.add_client(http_outcalls_rx, client, assembler, SLOT_TABLE_NO_LIMIT); }; (