Skip to content

Commit

Permalink
feat: Introduce p2p slot table limit and limit allowed ingress slots …
Browse files Browse the repository at this point in the history
…per peer (#1213)

To support hashes in blocks there are modifications #1061 to the ingress
pool that would allow the unvalidated ingress pool to grow without
bounds. P2P allows for bounding the unvalidated pool via the slot table.
Until now the slot table didn't have any bounds and this MR introduces a
config option to bound the slot table and introduces a hard bound for
the ingress pool of `50000`. Other clients use `usize::MAX` as a bound.
  • Loading branch information
tthebst authored Sep 27, 2024
1 parent d9ae74c commit 735935a
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 14 deletions.
4 changes: 4 additions & 0 deletions rs/p2p/consensus_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl ConsensusManagerBuilder {
outbound_artifacts_rx: Receiver<ArtifactTransmit<Artifact>>,
inbound_artifacts_tx: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
(assembler, assembler_router): (F, Router),
slot_limit: usize,
) {
assert!(uri_prefix::<WireArtifact>()
.chars()
Expand All @@ -78,6 +79,7 @@ impl ConsensusManagerBuilder {
assembler(transport.clone()),
transport,
topology_watcher,
slot_limit,
)
};

Expand Down Expand Up @@ -121,6 +123,7 @@ fn start_consensus_manager<Artifact, WireArtifact, Assembler>(
assembler: Assembler,
transport: Arc<dyn Transport>,
topology_watcher: watch::Receiver<SubnetTopology>,
slot_limit: usize,
) -> Vec<Shutdown>
where
Artifact: IdentifiableArtifact,
Expand All @@ -146,6 +149,7 @@ where
assembler,
sender,
topology_watcher,
slot_limit,
);
vec![shutdown_send_side, shutdown_receive_side]
}
Expand Down
9 changes: 9 additions & 0 deletions rs/p2p/consensus_manager/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(crate) struct ConsensusManagerMetrics {
// Slot table
pub slot_table_updates_total: IntCounter,
pub slot_table_updates_with_artifact_total: IntCounter,
pub slot_table_limit_exceeded_total: IntCounter,
pub slot_table_overwrite_total: IntCounter,
pub slot_table_stale_total: IntCounter,
pub slot_table_new_entry_total: IntCounterVec,
Expand Down Expand Up @@ -105,6 +106,14 @@ impl ConsensusManagerMetrics {
))
.unwrap(),
),
slot_table_limit_exceeded_total: metrics_registry.register(
IntCounter::with_opts(opts!(
"ic_consensus_manager_slot_table_limit_exceeded_total",
"Slot updates that would exceed slot table limit.",
const_labels.clone(),
))
.unwrap(),
),
slot_table_updates_with_artifact_total: metrics_registry.register(
IntCounter::with_opts(opts!(
"ic_consensus_manager_slot_table_updates_with_artifact_total",
Expand Down
77 changes: 70 additions & 7 deletions rs/p2p/consensus_manager/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ pub(crate) struct ConsensusManagerReceiver<
artifact_processor_tasks: JoinSet<(watch::Receiver<PeerCounter>, WireArtifact::Id)>,

topology_watcher: watch::Receiver<SubnetTopology>,

slot_limit: usize,
}

#[allow(unused)]
Expand All @@ -220,6 +222,7 @@ where
artifact_assembler: Assembler,
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
topology_watcher: watch::Receiver<SubnetTopology>,
slot_limit: usize,
) -> Shutdown {
let receive_manager = Self {
log,
Expand All @@ -232,6 +235,7 @@ where
slot_table: HashMap::new(),
artifact_processor_tasks: JoinSet::new(),
topology_watcher,
slot_limit,
};

Shutdown::spawn_on_with_cancellation(
Expand Down Expand Up @@ -363,12 +367,9 @@ where
id: id.clone(),
};

let (to_add, to_remove) = match self
.slot_table
.entry(peer_id)
.or_default()
.entry(slot_number)
{
let peer_slot_table = self.slot_table.entry(peer_id).or_default();
let peer_slot_table_len = peer_slot_table.len();
let (to_add, to_remove) = match peer_slot_table.entry(slot_number) {
Entry::Occupied(mut slot_entry_mut) => {
if slot_entry_mut.get().should_be_replaced(&new_slot_entry) {
self.metrics.slot_table_overwrite_total.inc();
Expand All @@ -379,14 +380,25 @@ where
(false, None)
}
}
Entry::Vacant(empty_slot) => {
// Only insert slot update if we are below peer slot table limit.
Entry::Vacant(empty_slot) if peer_slot_table_len < self.slot_limit => {
empty_slot.insert(new_slot_entry);
self.metrics
.slot_table_new_entry_total
.with_label_values(&[peer_id.to_string().as_str()])
.inc();
(true, None)
}
Entry::Vacant(_) => {
self.metrics.slot_table_limit_exceeded_total.inc();
warn!(
self.log,
"Peer {} tries to exceed slot limit {}. Dropping slot update",
peer_id,
self.slot_limit
);
(false, None)
}
};

if to_add {
Expand Down Expand Up @@ -607,6 +619,7 @@ mod tests {
sender: UnboundedSender<UnvalidatedArtifactMutation<U64Artifact>>,
artifact_assembler: MockArtifactAssembler,
topology_watcher: watch::Receiver<SubnetTopology>,
slot_limit: usize,

channels: Channels,
}
Expand Down Expand Up @@ -645,6 +658,7 @@ mod tests {
sender,
topology_watcher,
artifact_assembler,
slot_limit: usize::MAX,
channels: Channels {
unvalidated_artifact_receiver,
},
Expand All @@ -659,6 +673,11 @@ mod tests {
self
}

fn with_slot_limit(mut self, slot_limit: usize) -> Self {
self.slot_limit = slot_limit;
self
}

fn with_artifact_assembler_maker(
mut self,
make_mock: fn() -> MockArtifactAssembler,
Expand All @@ -682,6 +701,7 @@ mod tests {
active_assembles: HashMap::new(),
slot_table: HashMap::new(),
artifact_processor_tasks: JoinSet::new(),
slot_limit: self.slot_limit,
});

(consensus_manager_receiver, self.channels)
Expand Down Expand Up @@ -822,6 +842,49 @@ mod tests {
assert_eq!(mgr.artifact_processor_tasks.len(), 1);
}

#[tokio::test]
async fn slot_table_limit_exceeded() {
let (mut mgr, _channels) = ReceiverManagerBuilder::new().with_slot_limit(2).build();
let cancellation = CancellationToken::new();

mgr.handle_advert_receive(
SlotUpdate {
slot_number: SlotNumber::from(1),
commit_id: CommitId::from(1),
update: Update::Id(0),
},
NODE_1,
ConnId::from(1),
cancellation.clone(),
);
mgr.handle_advert_receive(
SlotUpdate {
slot_number: SlotNumber::from(2),
commit_id: CommitId::from(2),
update: Update::Id(1),
},
NODE_1,
ConnId::from(1),
cancellation.clone(),
);
assert_eq!(mgr.slot_table.len(), 1);
assert_eq!(mgr.slot_table.get(&NODE_1).unwrap().len(), 2);
assert_eq!(mgr.active_assembles.len(), 2);
// Send slot update that exceeds limit
mgr.handle_advert_receive(
SlotUpdate {
slot_number: SlotNumber::from(3),
commit_id: CommitId::from(3),
update: Update::Id(2),
},
NODE_1,
ConnId::from(1),
cancellation.clone(),
);
assert_eq!(mgr.slot_table.get(&NODE_1).unwrap().len(), 2);
assert_eq!(mgr.active_assembles.len(), 2);
}

/// Check that adverts updates with higher connection ids take precedence.
#[tokio::test]
async fn overwrite_slot1() {
Expand Down
7 changes: 6 additions & 1 deletion rs/p2p/test_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,12 @@ pub fn start_consensus_manager(
bouncer_factory,
MetricsRegistry::default(),
);
cm1.add_client(artifact_manager_event_rx, artifact_sender, downloader);
cm1.add_client(
artifact_manager_event_rx,
artifact_sender,
downloader,
usize::MAX,
);
(artifact_processor_jh, cm1)
}

Expand Down
1 change: 1 addition & 0 deletions rs/p2p/test_utils/src/turmoil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ pub fn add_transport_to_sim<F>(
artifact_manager_event_rx,
artifact_sender,
downloader,
usize::MAX,
);
router = Some(router.unwrap_or_default().merge(consensus_builder.router()));

Expand Down
21 changes: 15 additions & 6 deletions rs/replica/setup_ic_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ use tokio::sync::{mpsc::UnboundedSender, watch};
use tower_http::trace::TraceLayer;

pub const MAX_ADVERT_BUFFER: usize = 100_000;
/// This limit is used to protect against a malicious peer advertising many ingress messages.
/// If no malicious peers are present the ingress pools are bounded by a separate limit.
const SLOT_TABLE_LIMIT_INGRESS: usize = 50_000;
const SLOT_TABLE_NO_LIMIT: usize = usize::MAX;

/// The collection of all artifact pools.
struct ArtifactPools {
Expand Down Expand Up @@ -345,7 +349,7 @@ fn start_consensus(
bouncer,
metrics_registry.clone(),
);
new_p2p_consensus.add_client(consensus_rx, client, assembler);
new_p2p_consensus.add_client(consensus_rx, client, assembler, SLOT_TABLE_NO_LIMIT);
};

let ingress_sender = {
Expand All @@ -369,7 +373,12 @@ fn start_consensus(
metrics_registry.clone(),
);

new_p2p_consensus.add_client(ingress_rx, client.clone(), assembler);
new_p2p_consensus.add_client(
ingress_rx,
client.clone(),
assembler,
SLOT_TABLE_LIMIT_INGRESS,
);
client
};

Expand Down Expand Up @@ -403,7 +412,7 @@ fn start_consensus(
Arc::new(bouncer),
metrics_registry.clone(),
);
new_p2p_consensus.add_client(certification_rx, client, assembler);
new_p2p_consensus.add_client(certification_rx, client, assembler, SLOT_TABLE_NO_LIMIT);
};

{
Expand Down Expand Up @@ -432,7 +441,7 @@ fn start_consensus(
bouncer,
metrics_registry.clone(),
);
new_p2p_consensus.add_client(dkg_rx, client, assembler);
new_p2p_consensus.add_client(dkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT);
};

{
Expand Down Expand Up @@ -480,7 +489,7 @@ fn start_consensus(
bouncer,
metrics_registry.clone(),
);
new_p2p_consensus.add_client(idkg_rx, client, assembler);
new_p2p_consensus.add_client(idkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT);
};

{
Expand Down Expand Up @@ -514,7 +523,7 @@ fn start_consensus(
bouncer,
metrics_registry.clone(),
);
new_p2p_consensus.add_client(http_outcalls_rx, client, assembler);
new_p2p_consensus.add_client(http_outcalls_rx, client, assembler, SLOT_TABLE_NO_LIMIT);
};

(
Expand Down

0 comments on commit 735935a

Please sign in to comment.