Skip to content

Commit

Permalink
Merge branch 'master' into marko-fix-ic-version-rc-only
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-k0 committed Sep 27, 2024
2 parents 3c8c5ae + 735935a commit d35e117
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 14 deletions.
4 changes: 4 additions & 0 deletions rs/p2p/consensus_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl ConsensusManagerBuilder {
outbound_artifacts_rx: Receiver<ArtifactTransmit<Artifact>>,
inbound_artifacts_tx: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
(assembler, assembler_router): (F, Router),
slot_limit: usize,
) {
assert!(uri_prefix::<WireArtifact>()
.chars()
Expand All @@ -78,6 +79,7 @@ impl ConsensusManagerBuilder {
assembler(transport.clone()),
transport,
topology_watcher,
slot_limit,
)
};

Expand Down Expand Up @@ -121,6 +123,7 @@ fn start_consensus_manager<Artifact, WireArtifact, Assembler>(
assembler: Assembler,
transport: Arc<dyn Transport>,
topology_watcher: watch::Receiver<SubnetTopology>,
slot_limit: usize,
) -> Vec<Shutdown>
where
Artifact: IdentifiableArtifact,
Expand All @@ -146,6 +149,7 @@ where
assembler,
sender,
topology_watcher,
slot_limit,
);
vec![shutdown_send_side, shutdown_receive_side]
}
Expand Down
9 changes: 9 additions & 0 deletions rs/p2p/consensus_manager/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(crate) struct ConsensusManagerMetrics {
// Slot table
pub slot_table_updates_total: IntCounter,
pub slot_table_updates_with_artifact_total: IntCounter,
pub slot_table_limit_exceeded_total: IntCounter,
pub slot_table_overwrite_total: IntCounter,
pub slot_table_stale_total: IntCounter,
pub slot_table_new_entry_total: IntCounterVec,
Expand Down Expand Up @@ -105,6 +106,14 @@ impl ConsensusManagerMetrics {
))
.unwrap(),
),
slot_table_limit_exceeded_total: metrics_registry.register(
IntCounter::with_opts(opts!(
"ic_consensus_manager_slot_table_limit_exceeded_total",
"Slot updates that would exceed slot table limit.",
const_labels.clone(),
))
.unwrap(),
),
slot_table_updates_with_artifact_total: metrics_registry.register(
IntCounter::with_opts(opts!(
"ic_consensus_manager_slot_table_updates_with_artifact_total",
Expand Down
77 changes: 70 additions & 7 deletions rs/p2p/consensus_manager/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ pub(crate) struct ConsensusManagerReceiver<
artifact_processor_tasks: JoinSet<(watch::Receiver<PeerCounter>, WireArtifact::Id)>,

topology_watcher: watch::Receiver<SubnetTopology>,

slot_limit: usize,
}

#[allow(unused)]
Expand All @@ -220,6 +222,7 @@ where
artifact_assembler: Assembler,
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
topology_watcher: watch::Receiver<SubnetTopology>,
slot_limit: usize,
) -> Shutdown {
let receive_manager = Self {
log,
Expand All @@ -232,6 +235,7 @@ where
slot_table: HashMap::new(),
artifact_processor_tasks: JoinSet::new(),
topology_watcher,
slot_limit,
};

Shutdown::spawn_on_with_cancellation(
Expand Down Expand Up @@ -363,12 +367,9 @@ where
id: id.clone(),
};

let (to_add, to_remove) = match self
.slot_table
.entry(peer_id)
.or_default()
.entry(slot_number)
{
let peer_slot_table = self.slot_table.entry(peer_id).or_default();
let peer_slot_table_len = peer_slot_table.len();
let (to_add, to_remove) = match peer_slot_table.entry(slot_number) {
Entry::Occupied(mut slot_entry_mut) => {
if slot_entry_mut.get().should_be_replaced(&new_slot_entry) {
self.metrics.slot_table_overwrite_total.inc();
Expand All @@ -379,14 +380,25 @@ where
(false, None)
}
}
Entry::Vacant(empty_slot) => {
// Only insert slot update if we are below peer slot table limit.
Entry::Vacant(empty_slot) if peer_slot_table_len < self.slot_limit => {
empty_slot.insert(new_slot_entry);
self.metrics
.slot_table_new_entry_total
.with_label_values(&[peer_id.to_string().as_str()])
.inc();
(true, None)
}
Entry::Vacant(_) => {
self.metrics.slot_table_limit_exceeded_total.inc();
warn!(
self.log,
"Peer {} tries to exceed slot limit {}. Dropping slot update",
peer_id,
self.slot_limit
);
(false, None)
}
};

if to_add {
Expand Down Expand Up @@ -607,6 +619,7 @@ mod tests {
sender: UnboundedSender<UnvalidatedArtifactMutation<U64Artifact>>,
artifact_assembler: MockArtifactAssembler,
topology_watcher: watch::Receiver<SubnetTopology>,
slot_limit: usize,

channels: Channels,
}
Expand Down Expand Up @@ -645,6 +658,7 @@ mod tests {
sender,
topology_watcher,
artifact_assembler,
slot_limit: usize::MAX,
channels: Channels {
unvalidated_artifact_receiver,
},
Expand All @@ -659,6 +673,11 @@ mod tests {
self
}

fn with_slot_limit(mut self, slot_limit: usize) -> Self {
self.slot_limit = slot_limit;
self
}

fn with_artifact_assembler_maker(
mut self,
make_mock: fn() -> MockArtifactAssembler,
Expand All @@ -682,6 +701,7 @@ mod tests {
active_assembles: HashMap::new(),
slot_table: HashMap::new(),
artifact_processor_tasks: JoinSet::new(),
slot_limit: self.slot_limit,
});

(consensus_manager_receiver, self.channels)
Expand Down Expand Up @@ -822,6 +842,49 @@ mod tests {
assert_eq!(mgr.artifact_processor_tasks.len(), 1);
}

#[tokio::test]
async fn slot_table_limit_exceeded() {
let (mut mgr, _channels) = ReceiverManagerBuilder::new().with_slot_limit(2).build();
let cancellation = CancellationToken::new();

mgr.handle_advert_receive(
SlotUpdate {
slot_number: SlotNumber::from(1),
commit_id: CommitId::from(1),
update: Update::Id(0),
},
NODE_1,
ConnId::from(1),
cancellation.clone(),
);
mgr.handle_advert_receive(
SlotUpdate {
slot_number: SlotNumber::from(2),
commit_id: CommitId::from(2),
update: Update::Id(1),
},
NODE_1,
ConnId::from(1),
cancellation.clone(),
);
assert_eq!(mgr.slot_table.len(), 1);
assert_eq!(mgr.slot_table.get(&NODE_1).unwrap().len(), 2);
assert_eq!(mgr.active_assembles.len(), 2);
// Send slot update that exceeds limit
mgr.handle_advert_receive(
SlotUpdate {
slot_number: SlotNumber::from(3),
commit_id: CommitId::from(3),
update: Update::Id(2),
},
NODE_1,
ConnId::from(1),
cancellation.clone(),
);
assert_eq!(mgr.slot_table.get(&NODE_1).unwrap().len(), 2);
assert_eq!(mgr.active_assembles.len(), 2);
}

/// Check that adverts updates with higher connection ids take precedence.
#[tokio::test]
async fn overwrite_slot1() {
Expand Down
7 changes: 6 additions & 1 deletion rs/p2p/test_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,12 @@ pub fn start_consensus_manager(
bouncer_factory,
MetricsRegistry::default(),
);
cm1.add_client(artifact_manager_event_rx, artifact_sender, downloader);
cm1.add_client(
artifact_manager_event_rx,
artifact_sender,
downloader,
usize::MAX,
);
(artifact_processor_jh, cm1)
}

Expand Down
1 change: 1 addition & 0 deletions rs/p2p/test_utils/src/turmoil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ pub fn add_transport_to_sim<F>(
artifact_manager_event_rx,
artifact_sender,
downloader,
usize::MAX,
);
router = Some(router.unwrap_or_default().merge(consensus_builder.router()));

Expand Down
21 changes: 15 additions & 6 deletions rs/replica/setup_ic_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ use tokio::sync::{mpsc::UnboundedSender, watch};
use tower_http::trace::TraceLayer;

pub const MAX_ADVERT_BUFFER: usize = 100_000;
/// This limit is used to protect against a malicious peer advertising many ingress messages.
/// If no malicious peers are present the ingress pools are bounded by a separate limit.
const SLOT_TABLE_LIMIT_INGRESS: usize = 50_000;
const SLOT_TABLE_NO_LIMIT: usize = usize::MAX;

/// The collection of all artifact pools.
struct ArtifactPools {
Expand Down Expand Up @@ -345,7 +349,7 @@ fn start_consensus(
bouncer,
metrics_registry.clone(),
);
new_p2p_consensus.add_client(consensus_rx, client, assembler);
new_p2p_consensus.add_client(consensus_rx, client, assembler, SLOT_TABLE_NO_LIMIT);
};

let ingress_sender = {
Expand All @@ -369,7 +373,12 @@ fn start_consensus(
metrics_registry.clone(),
);

new_p2p_consensus.add_client(ingress_rx, client.clone(), assembler);
new_p2p_consensus.add_client(
ingress_rx,
client.clone(),
assembler,
SLOT_TABLE_LIMIT_INGRESS,
);
client
};

Expand Down Expand Up @@ -403,7 +412,7 @@ fn start_consensus(
Arc::new(bouncer),
metrics_registry.clone(),
);
new_p2p_consensus.add_client(certification_rx, client, assembler);
new_p2p_consensus.add_client(certification_rx, client, assembler, SLOT_TABLE_NO_LIMIT);
};

{
Expand Down Expand Up @@ -432,7 +441,7 @@ fn start_consensus(
bouncer,
metrics_registry.clone(),
);
new_p2p_consensus.add_client(dkg_rx, client, assembler);
new_p2p_consensus.add_client(dkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT);
};

{
Expand Down Expand Up @@ -480,7 +489,7 @@ fn start_consensus(
bouncer,
metrics_registry.clone(),
);
new_p2p_consensus.add_client(idkg_rx, client, assembler);
new_p2p_consensus.add_client(idkg_rx, client, assembler, SLOT_TABLE_NO_LIMIT);
};

{
Expand Down Expand Up @@ -514,7 +523,7 @@ fn start_consensus(
bouncer,
metrics_registry.clone(),
);
new_p2p_consensus.add_client(http_outcalls_rx, client, assembler);
new_p2p_consensus.add_client(http_outcalls_rx, client, assembler, SLOT_TABLE_NO_LIMIT);
};

(
Expand Down

0 comments on commit d35e117

Please sign in to comment.