Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Stability] Garbage collect polls for all views < latest-2 #2226

Merged
merged 5 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 69 additions & 50 deletions crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use serde::{Deserialize, Serialize};
use surf_disco::Url;

use hotshot_types::traits::network::ViewMessage;
use std::collections::BTreeMap;
use std::{
collections::{hash_map::Entry, BTreeSet, HashMap},
collections::{btree_map::Entry, BTreeSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand Down Expand Up @@ -119,25 +120,25 @@ struct Inner<TYPES: NodeType> {

/// Task map for quorum proposals.
proposal_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for quorum votes.
vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for VID disperse data
vid_disperse_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for DACs.
dac_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for view sync certificates.
view_sync_cert_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for view sync votes.
view_sync_vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for transactions
txn_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Arc<RwLock<BTreeMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
}

impl<TYPES: NodeType> Inner<TYPES> {
Expand Down Expand Up @@ -330,6 +331,7 @@ impl<TYPES: NodeType> Inner<TYPES> {
ConsensusIntentEvent::CancelPollForVotes(event_view)
| ConsensusIntentEvent::CancelPollForProposal(event_view)
| ConsensusIntentEvent::CancelPollForDAC(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncCertificate(event_view)
| ConsensusIntentEvent::CancelPollForVIDDisperse(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncVotes(event_view) => {
if view_number == event_view {
Expand Down Expand Up @@ -768,8 +770,6 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
);

// TODO ED Need to handle canceling tasks that don't receive their expected output (such a proposal that never comes)
// TODO ED Need to GC all old views, not just singular views, could lead to a network leak

match event {
ConsensusIntentEvent::PollForProposal(view_number) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also GC the view sync stuff here, it's possible we just don't hit view sync for a long time, and we'd be running the view sync polling task for a ton of views. In fact if maybe we can just cancel all old task regardless of what we are getting the new polling intent for. Basically every time we start to poll for a new view number we can GC old view task from all the task maps

Copy link
Collaborator Author

@rob-maron rob-maron Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this but I have to be careful; certain things like transactions and leaders we poll way ahead for. If we start polling for txes in view n+3, polls for other things in view n would get cancelled before we have everything resolved

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that's a good point we need to be careful. I was more thinking only for the big things, like proposal and view sync certificates. On second thought maybe it's best to have a specific event like CancelOldPolls which we inject when we update our view that cancels the old view polling. That way we have one entry point for GCing but don't have to piggy back on the old events which might have weirdness like you mentioned.

// Check if we already have a task for this (we shouldn't)
Expand Down Expand Up @@ -799,15 +799,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&view_number.wrapping_sub(2)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForProposal(
view_number.wrapping_sub(2),
))
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForProposal(*view))
Comment on lines +802 to +808
Copy link
Collaborator

@bfish713 bfish713 Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit Can we make this a function that takes the event, maybe as a fn like event_fn: fn(view: u64) -> ConsensusIntentEvent and then the function could just do range.map(event_fn). Mostly just making this a function so this code isn't copy pasted a bunch of times

.await;
}
}
Expand Down Expand Up @@ -839,15 +837,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&view_number.wrapping_sub(2)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDDisperse(
view_number.wrapping_sub(2),
))
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForVIDDisperse(*view))
.await;
}
}
Expand Down Expand Up @@ -894,16 +890,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
// TODO ED This won't work for vote collection, last task is more than 2 view ago depending on size of network, will need to rely on cancel task from consensus
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVotes(
view_number.wrapping_sub(2),
))
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForVotes(*view))
.await;
}
}
Expand Down Expand Up @@ -932,15 +925,13 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForDAC(
view_number.wrapping_sub(2),
))
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForDAC(*view))
.await;
}
}
Expand Down Expand Up @@ -986,7 +977,17 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// TODO ED Do we need to GC before returning? Or will view sync task handle that?
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForViewSyncCertificate(
*view,
))
.await;
}
rob-maron marked this conversation as resolved.
Show resolved Hide resolved
}
ConsensusIntentEvent::PollForViewSyncVotes(view_number) => {
let mut task_map = self.inner.view_sync_vote_task_map.write().await;
Expand Down Expand Up @@ -1015,6 +1016,16 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
} else {
debug!("Somehow task already existed!");
}

// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForViewSyncVotes(*view))
.await;
}
}

ConsensusIntentEvent::CancelPollForViewSyncCertificate(view_number) => {
Expand Down Expand Up @@ -1047,7 +1058,7 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
}
ConsensusIntentEvent::PollForTransactions(view_number) => {
let mut task_map = self.inner.txn_task_map.write().await;
if let std::collections::hash_map::Entry::Vacant(e) = task_map.entry(view_number) {
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);
Expand All @@ -1069,7 +1080,15 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
debug!("Somehow task already existed!");
}

// TODO ED Do we need to GC before returning? Or will view sync task handle that?
// Remove all entries in the task map that are polling for a view less than or equal to `view_number - 2`.
let view_minus_2 = view_number.saturating_sub(2);
let range = task_map.range(..view_minus_2);
for (view, task) in range {
// Cancel the old task by sending a message to it. If the task already exited we expect an error
let _res = task
.send(ConsensusIntentEvent::CancelPollForTransactions(*view))
.await;
}
}
ConsensusIntentEvent::CancelPollForTransactions(view_number) => {
let mut task_map = self.inner.txn_task_map.write().await;
Expand Down
3 changes: 2 additions & 1 deletion crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
let high_qc = self.consensus.read().await.high_qc.clone();
let leader_view = high_qc.get_view_number() + 1;
if self.quorum_membership.get_leader(leader_view) == self.public_key {
self.publish_proposal_if_able(high_qc, leader_view, None).await;
self.publish_proposal_if_able(high_qc, leader_view, None)
.await;
}
}
_ => {}
Expand Down
5 changes: 5 additions & 0 deletions crates/task-impls/src/view_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ impl<
subscribe_view,
))
.await;
// Also subscribe to the latest view for the same reason. The GC will remove the above poll
// in the case that one doesn't resolve but this one does.
self.network
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
.await;

self.network
.inject_consensus_info(ConsensusIntentEvent::PollForDAC(subscribe_view))
Expand Down