Skip to content

Commit

Permalink
[CATCHUP] Fetch Proposal when Missing (#3310)
Browse files Browse the repository at this point in the history
* Respond with proposal if we have it

* add response chan to missing proposal event

* Send proposal back out to requester

* do the fetch in when parent proposal is missing

* add timeout to request for data

* always send back response to chan

* simplify match

* spawn request task

* update maps, add timeout

* don't always fetch

* fix test

* fix upgrade off by one

* merge

* fix test again after merge

* fix dependency

* merge

* lint for dep version

* Combine Serialize and Sign to one fn

* Don't always spawn request

* rename
  • Loading branch information
bfish713 authored Jun 14, 2024
1 parent 9b642d3 commit 549d41f
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 80 deletions.
2 changes: 1 addition & 1 deletion crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
add_network_message_task(&mut handle, Arc::clone(&da_network)).await;

if let Some(request_receiver) = da_network.spawn_request_receiver_task().await {
add_response_task(&mut handle, request_receiver).await;
add_request_network_task(&mut handle).await;
add_response_task(&mut handle, request_receiver).await;
}

add_network_event_task(
Expand Down
116 changes: 102 additions & 14 deletions crates/task-impls/src/consensus/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use crate::{events::HotShotEvent, helpers::broadcast_event};

#[cfg(not(feature = "dependency-tasks"))]
use core::time::Duration;
#[cfg(not(feature = "dependency-tasks"))]
use std::marker::PhantomData;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use crate::{events::ProposalMissing, request::REQUEST_TIMEOUT};

#[cfg(not(feature = "dependency-tasks"))]
use super::ConsensusTaskState;
Expand All @@ -14,12 +10,15 @@ use crate::{
consensus::{update_view, view_change::SEND_VIEW_CHANGE_EVENT},
helpers::AnyhowTracing,
};
use crate::{events::HotShotEvent, helpers::broadcast_event};
#[cfg(not(feature = "dependency-tasks"))]
use anyhow::bail;
use anyhow::{ensure, Context, Result};
#[cfg(not(feature = "dependency-tasks"))]
use async_broadcast::broadcast;
use async_broadcast::Sender;
#[cfg(not(feature = "dependency-tasks"))]
use async_compatibility_layer::art::async_timeout;
#[cfg(not(feature = "dependency-tasks"))]
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
#[cfg(not(feature = "dependency-tasks"))]
Expand All @@ -29,6 +28,8 @@ use async_std::task::JoinHandle;
use chrono::Utc;
use committable::{Commitment, Committable};
#[cfg(not(feature = "dependency-tasks"))]
use core::time::Duration;
#[cfg(not(feature = "dependency-tasks"))]
use futures::FutureExt;
#[cfg(not(feature = "dependency-tasks"))]
use hotshot_types::{
Expand All @@ -53,6 +54,12 @@ use hotshot_types::{
};
#[cfg(not(feature = "dependency-tasks"))]
use hotshot_types::{data::null_block, message::GeneralConsensusMessage, simple_vote::QuorumData};
#[cfg(not(feature = "dependency-tasks"))]
use std::marker::PhantomData;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
#[cfg(not(feature = "dependency-tasks"))]
Expand Down Expand Up @@ -541,6 +548,56 @@ pub async fn publish_proposal_if_able<TYPES: NodeType>(
.await
}

/// Trigger a request to the network for a proposal for a view and wait for the response
#[cfg(not(feature = "dependency-tasks"))]
async fn fetch_proposal<TYPES: NodeType>(
view: TYPES::Time,
event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
quorum_membership: Arc<TYPES::Membership>,
consensus: Arc<RwLock<Consensus<TYPES>>>,
) -> Result<Leaf<TYPES>> {
let (tx, mut rx) = broadcast(1);
let event = ProposalMissing {
view,
response_chan: tx,
};
broadcast_event(
Arc::new(HotShotEvent::QuorumProposalRequest(event)),
&event_stream,
)
.await;
let Ok(Ok(Some(proposal))) = async_timeout(REQUEST_TIMEOUT, rx.recv_direct()).await else {
bail!("Request for proposal failed");
};
let view = proposal.data.view_number();
let justify_qc = proposal.data.justify_qc.clone();

if !justify_qc.is_valid_cert(quorum_membership.as_ref()) {
bail!("Invalid justify_qc in proposal for view {}", *view);
}
let mut consensus_write = consensus.write().await;
let leaf = Leaf::from_quorum_proposal(&proposal.data);
let state = Arc::new(
<TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(&proposal.data.block_header),
);

if let Err(e) = consensus_write.update_validated_state_map(
view,
View {
view_inner: ViewInner::Leaf {
leaf: leaf.commit(),
state,
delta: None,
},
},
) {
tracing::trace!("{e:?}");
}

consensus_write.update_saved_leaves(leaf.clone());
Ok(leaf)
}

/// Handle the received quorum proposal.
///
/// Returns the proposal that should be used to set the `cur_proposal` for other tasks.
Expand Down Expand Up @@ -598,14 +655,29 @@ pub(crate) async fn handle_quorum_proposal_recv<TYPES: NodeType, I: NodeImplemen
debug!("Failed to update view; error = {e:#}");
}

let mut parent_leaf = task_state
.consensus
.read()
.await
.saved_leaves()
.get(&justify_qc.date().leaf_commit)
.cloned();

parent_leaf = match parent_leaf {
Some(p) => Some(p),
None => fetch_proposal(
justify_qc.view_number(),
event_stream.clone(),
Arc::clone(&task_state.quorum_membership),
Arc::clone(&task_state.consensus),
)
.await
.ok(),
};
let consensus_read = task_state.consensus.read().await;

// Get the parent leaf and state.
let parent = match consensus_read
.saved_leaves()
.get(&justify_qc.date().leaf_commit)
.cloned()
{
let parent = match parent_leaf {
Some(leaf) => {
if let (Some(state), _) = consensus_read.state_and_delta(leaf.view_number()) {
Some((leaf, Arc::clone(&state)))
Expand Down Expand Up @@ -1136,14 +1208,30 @@ pub async fn update_state_and_vote_if_able<TYPES: NodeType, I: NodeImplementatio
let Some(cert) = read_consnesus.saved_da_certs().get(&cur_view).cloned() else {
return false;
};
drop(read_consnesus);

let view = cert.view_number;
// TODO: do some of this logic without the vote token check, only do that when voting.
let justify_qc = proposal.justify_qc.clone();
let parent = read_consnesus
let mut parent = consensus
.read()
.await
.saved_leaves()
.get(&justify_qc.date().leaf_commit)
.cloned();
parent = match parent {
Some(p) => Some(p),
None => fetch_proposal(
justify_qc.view_number(),
vote_info.3.clone(),
Arc::clone(&quorum_membership),
Arc::clone(&consensus),
)
.await
.ok(),
};

let read_consnesus = consensus.read().await;

// Justify qc's leaf commitment is not the same as the parent's leaf commitment, but it should be (in this case)
let Some(parent) = parent else {
Expand Down
25 changes: 22 additions & 3 deletions crates/task-impls/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{fmt::Display, sync::Arc};

use async_broadcast::Sender;
use either::Either;
use hotshot_task::task::TaskEvent;
use hotshot_types::{
Expand Down Expand Up @@ -28,6 +29,24 @@ impl<TYPES: NodeType> TaskEvent for HotShotEvent<TYPES> {
}
}

/// Wrapper type for the event to notify tasks that a proposal for a view is missing
/// and the channel to send the event back to
#[derive(Debug, Clone)]
pub struct ProposalMissing<TYPES: NodeType> {
/// View of missing proposal
pub view: TYPES::Time,
/// Channel to send the response back to
pub response_chan: Sender<Option<Proposal<TYPES, QuorumProposal<TYPES>>>>,
}

impl<TYPES: NodeType> PartialEq for ProposalMissing<TYPES> {
fn eq(&self, other: &Self) -> bool {
self.view == other.view
}
}

impl<TYPES: NodeType> Eq for ProposalMissing<TYPES> {}

/// Marker that the task completed
#[derive(Eq, PartialEq, Debug, Clone)]
pub struct HotShotTaskCompleted;
Expand Down Expand Up @@ -65,7 +84,7 @@ pub enum HotShotEvent<TYPES: NodeType> {
/// A quorum proposal with the given parent leaf is validated.
QuorumProposalValidated(QuorumProposal<TYPES>, Leaf<TYPES>),
/// A quorum proposal is missing for a view that we meed
QuorumProposalMissing(TYPES::Time),
QuorumProposalRequest(ProposalMissing<TYPES>),
/// Send a DA proposal to the DA committee; emitted by the DA leader (which is the same node as the leader of view v + 1) in the DA task
DaProposalSend(Proposal<TYPES, DaProposal<TYPES>>, TYPES::SignatureKey),
/// Send a DA vote to the DA leader; emitted by DA committee members in the DA task after seeing a valid DA proposal
Expand Down Expand Up @@ -395,8 +414,8 @@ impl<TYPES: NodeType> Display for HotShotEvent<TYPES> {
HotShotEvent::UpgradeDecided(cert) => {
write!(f, "UpgradeDecided(view_number{:?})", cert.view_number())
}
HotShotEvent::QuorumProposalMissing(view_number) => {
write!(f, "QuorumProposalMissing(view_number={view_number:?})")
HotShotEvent::QuorumProposalRequest(view_number) => {
write!(f, "QuorumProposalRequest(view_number={view_number:?})")
}
HotShotEvent::VoteNow(view_number, _) => {
write!(f, "VoteNow(view_number={view_number:?})")
Expand Down
Loading

0 comments on commit 549d41f

Please sign in to comment.