From 549d41f4e6602b7e3e398585efac6d8113febd13 Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Fri, 14 Jun 2024 14:58:47 -0400 Subject: [PATCH] [CATCHUP] Fetch Proposal when Missing (#3310) * Respond with proposal if we have it * add response chan to missing proposal event * Send proposal back out to requester * do the fetch in when parent proposal is missing * add timeout to request for data * always send back response to chan * simplify match * spawn request task * update maps, add timeout * don't always fetch * fix test * fix upgrade off by one * merge * fix test again after merge * fix dependency * merge * lint for dep version * Combine Serialize and Sign to one fn * Don't always spawn request * rename --- crates/hotshot/src/lib.rs | 2 +- crates/task-impls/src/consensus/helpers.rs | 116 +++++++++++-- crates/task-impls/src/events.rs | 25 ++- crates/task-impls/src/request.rs | 170 +++++++++++++------ crates/task-impls/src/response.rs | 18 +- crates/testing/tests/tests_1/upgrade_task.rs | 17 +- crates/types/src/consensus.rs | 5 + 7 files changed, 273 insertions(+), 80 deletions(-) diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index 0b7f697993..7da7f102f6 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -601,8 +601,8 @@ impl> SystemContext { add_network_message_task(&mut handle, Arc::clone(&da_network)).await; if let Some(request_receiver) = da_network.spawn_request_receiver_task().await { - add_response_task(&mut handle, request_receiver).await; add_request_network_task(&mut handle).await; + add_response_task(&mut handle, request_receiver).await; } add_network_event_task( diff --git a/crates/task-impls/src/consensus/helpers.rs b/crates/task-impls/src/consensus/helpers.rs index fbd3d60fa5..369517b7bd 100644 --- a/crates/task-impls/src/consensus/helpers.rs +++ b/crates/task-impls/src/consensus/helpers.rs @@ -1,11 +1,7 @@ +use crate::{events::HotShotEvent, helpers::broadcast_event}; + #[cfg(not(feature = "dependency-tasks"))] -use core::time::Duration; -#[cfg(not(feature = "dependency-tasks"))] -use std::marker::PhantomData; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use crate::{events::ProposalMissing, request::REQUEST_TIMEOUT}; #[cfg(not(feature = "dependency-tasks"))] use super::ConsensusTaskState; @@ -14,12 +10,15 @@ use crate::{ consensus::{update_view, view_change::SEND_VIEW_CHANGE_EVENT}, helpers::AnyhowTracing, }; -use crate::{events::HotShotEvent, helpers::broadcast_event}; #[cfg(not(feature = "dependency-tasks"))] use anyhow::bail; use anyhow::{ensure, Context, Result}; +#[cfg(not(feature = "dependency-tasks"))] +use async_broadcast::broadcast; use async_broadcast::Sender; #[cfg(not(feature = "dependency-tasks"))] +use async_compatibility_layer::art::async_timeout; +#[cfg(not(feature = "dependency-tasks"))] use async_compatibility_layer::art::{async_sleep, async_spawn}; use async_lock::RwLock; #[cfg(not(feature = "dependency-tasks"))] @@ -29,6 +28,8 @@ use async_std::task::JoinHandle; use chrono::Utc; use committable::{Commitment, Committable}; #[cfg(not(feature = "dependency-tasks"))] +use core::time::Duration; +#[cfg(not(feature = "dependency-tasks"))] use futures::FutureExt; #[cfg(not(feature = "dependency-tasks"))] use hotshot_types::{ @@ -53,6 +54,12 @@ use hotshot_types::{ }; #[cfg(not(feature = "dependency-tasks"))] use hotshot_types::{data::null_block, message::GeneralConsensusMessage, simple_vote::QuorumData}; +#[cfg(not(feature = "dependency-tasks"))] +use std::marker::PhantomData; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; #[cfg(async_executor_impl = "tokio")] use tokio::task::JoinHandle; #[cfg(not(feature = "dependency-tasks"))] @@ -541,6 +548,56 @@ pub async fn publish_proposal_if_able( .await } +/// Trigger a request to the network for a proposal for a view and wait for the response +#[cfg(not(feature = "dependency-tasks"))] +async fn fetch_proposal( + view: TYPES::Time, + event_stream: Sender>>, + quorum_membership: Arc, + consensus: Arc>>, +) -> Result> { + let (tx, mut rx) = broadcast(1); + let event = ProposalMissing { + view, + response_chan: tx, + }; + broadcast_event( + Arc::new(HotShotEvent::QuorumProposalRequest(event)), + &event_stream, + ) + .await; + let Ok(Ok(Some(proposal))) = async_timeout(REQUEST_TIMEOUT, rx.recv_direct()).await else { + bail!("Request for proposal failed"); + }; + let view = proposal.data.view_number(); + let justify_qc = proposal.data.justify_qc.clone(); + + if !justify_qc.is_valid_cert(quorum_membership.as_ref()) { + bail!("Invalid justify_qc in proposal for view {}", *view); + } + let mut consensus_write = consensus.write().await; + let leaf = Leaf::from_quorum_proposal(&proposal.data); + let state = Arc::new( + >::from_header(&proposal.data.block_header), + ); + + if let Err(e) = consensus_write.update_validated_state_map( + view, + View { + view_inner: ViewInner::Leaf { + leaf: leaf.commit(), + state, + delta: None, + }, + }, + ) { + tracing::trace!("{e:?}"); + } + + consensus_write.update_saved_leaves(leaf.clone()); + Ok(leaf) +} + /// Handle the received quorum proposal. /// /// Returns the proposal that should be used to set the `cur_proposal` for other tasks. @@ -598,14 +655,29 @@ pub(crate) async fn handle_quorum_proposal_recv Some(p), + None => fetch_proposal( + justify_qc.view_number(), + event_stream.clone(), + Arc::clone(&task_state.quorum_membership), + Arc::clone(&task_state.consensus), + ) + .await + .ok(), + }; let consensus_read = task_state.consensus.read().await; // Get the parent leaf and state. - let parent = match consensus_read - .saved_leaves() - .get(&justify_qc.date().leaf_commit) - .cloned() - { + let parent = match parent_leaf { Some(leaf) => { if let (Some(state), _) = consensus_read.state_and_delta(leaf.view_number()) { Some((leaf, Arc::clone(&state))) @@ -1136,14 +1208,30 @@ pub async fn update_state_and_vote_if_able Some(p), + None => fetch_proposal( + justify_qc.view_number(), + vote_info.3.clone(), + Arc::clone(&quorum_membership), + Arc::clone(&consensus), + ) + .await + .ok(), + }; + + let read_consnesus = consensus.read().await; // Justify qc's leaf commitment is not the same as the parent's leaf commitment, but it should be (in this case) let Some(parent) = parent else { diff --git a/crates/task-impls/src/events.rs b/crates/task-impls/src/events.rs index adcfba6963..10cba4e195 100644 --- a/crates/task-impls/src/events.rs +++ b/crates/task-impls/src/events.rs @@ -1,5 +1,6 @@ use std::{fmt::Display, sync::Arc}; +use async_broadcast::Sender; use either::Either; use hotshot_task::task::TaskEvent; use hotshot_types::{ @@ -28,6 +29,24 @@ impl TaskEvent for HotShotEvent { } } +/// Wrapper type for the event to notify tasks that a proposal for a view is missing +/// and the channel to send the event back to +#[derive(Debug, Clone)] +pub struct ProposalMissing { + /// View of missing proposal + pub view: TYPES::Time, + /// Channel to send the response back to + pub response_chan: Sender>>>, +} + +impl PartialEq for ProposalMissing { + fn eq(&self, other: &Self) -> bool { + self.view == other.view + } +} + +impl Eq for ProposalMissing {} + /// Marker that the task completed #[derive(Eq, PartialEq, Debug, Clone)] pub struct HotShotTaskCompleted; @@ -65,7 +84,7 @@ pub enum HotShotEvent { /// A quorum proposal with the given parent leaf is validated. QuorumProposalValidated(QuorumProposal, Leaf), /// A quorum proposal is missing for a view that we meed - QuorumProposalMissing(TYPES::Time), + QuorumProposalRequest(ProposalMissing), /// Send a DA proposal to the DA committee; emitted by the DA leader (which is the same node as the leader of view v + 1) in the DA task DaProposalSend(Proposal>, TYPES::SignatureKey), /// Send a DA vote to the DA leader; emitted by DA committee members in the DA task after seeing a valid DA proposal @@ -395,8 +414,8 @@ impl Display for HotShotEvent { HotShotEvent::UpgradeDecided(cert) => { write!(f, "UpgradeDecided(view_number{:?})", cert.view_number()) } - HotShotEvent::QuorumProposalMissing(view_number) => { - write!(f, "QuorumProposalMissing(view_number={view_number:?})") + HotShotEvent::QuorumProposalRequest(view_number) => { + write!(f, "QuorumProposalRequest(view_number={view_number:?})") } HotShotEvent::VoteNow(view_number, _) => { write!(f, "VoteNow(view_number={view_number:?})") diff --git a/crates/task-impls/src/request.rs b/crates/task-impls/src/request.rs index bda5ae8831..9d30067b3c 100644 --- a/crates/task-impls/src/request.rs +++ b/crates/task-impls/src/request.rs @@ -17,7 +17,11 @@ use async_trait::async_trait; use hotshot_task::task::TaskState; use hotshot_types::{ consensus::Consensus, - message::{DaConsensusMessage, DataMessage, Message, MessageKind, SequencingMessage}, + data::QuorumProposal, + message::{ + DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message, MessageKind, Proposal, + SequencingMessage, + }, traits::{ election::Membership, network::{ConnectedNetwork, DataRequest, RequestKind, ResponseMessage}, @@ -32,10 +36,13 @@ use sha2::{Digest, Sha256}; use tokio::task::JoinHandle; use tracing::{debug, error, info, instrument, warn}; -use crate::{events::HotShotEvent, helpers::broadcast_event}; +use crate::{ + events::{HotShotEvent, ProposalMissing}, + helpers::broadcast_event, +}; /// Amount of time to try for a request before timing out. -const REQUEST_TIMEOUT: Duration = Duration::from_millis(500); +pub const REQUEST_TIMEOUT: Duration = Duration::from_millis(500); /// Long running task which will request information after a proposal is received. /// The task will wait a it's `delay` and then send a request iteratively to peers @@ -102,8 +109,12 @@ impl> TaskState for NetworkRequest } Ok(()) } - HotShotEvent::QuorumProposalMissing(view) => { - self.run_delay(RequestKind::Proposal(*view), sender.clone(), *view); + HotShotEvent::QuorumProposalRequest(missing) => { + let ProposalMissing { + view, + response_chan: chan, + } = missing; + self.run_proposal(&RequestKind::Proposal(*view), chan.clone(), *view); Ok(()) } _ => Ok(()), @@ -154,6 +165,22 @@ impl> NetworkRequestState, + ) -> Option<::PureAssembledSignatureType> { + let Ok(data) = bincode::serialize(&request) else { + tracing::error!("Failed to serialize request!"); + return None; + }; + let Ok(signature) = TYPES::SignatureKey::sign(&self.private_key, &Sha256::digest(data)) + else { + error!("Failed to sign Data Request"); + return None; + }; + Some(signature) + } /// run a delayed request task for a request. The first response /// received will be sent over `sender` #[instrument(skip_all, fields(id = self.id, view = *self.view), name = "NetworkRequestState run_delay", level = "error")] @@ -168,7 +195,6 @@ impl> NetworkRequestState> NetworkRequestState, + response_chan: Sender>>>, + view: TYPES::Time, + ) { + let leader = self.da_membership.leader(view); + let requester = ProposalRequester:: { + network: Arc::clone(&self.network), + sender: response_chan, + leader, + }; + let Some(signature) = self.serialize_and_sign(request) else { + return; + }; + + let pub_key = self.public_key.clone(); + async_spawn(async move { + requester.do_proposal(view, signature, pub_key).await; + }); + } + /// Signals delayed requesters to finish pub fn set_shutdown_flag(&self) { self.shutdown_flag.store(true, Ordering::Relaxed); @@ -216,27 +258,71 @@ struct DelayedRequester> { delay: Duration, /// The peers we will request in a random order recipients: Vec, - /// Leader for the view of the request - leader: TYPES::SignatureKey, /// A flag indicating that `HotShotEvent::Shutdown` has been received shutdown_flag: Arc, } +/// A task the requests some data immediately from one peer + +struct ProposalRequester> { + /// Network to send requests + network: Arc, + /// Channel to send the event when we receive a response + sender: Sender>>>, + /// Leader for the view of the request + leader: TYPES::SignatureKey, +} + +impl> ProposalRequester { + /// Handle sending a request for proposal for a view, does + /// not delay + async fn do_proposal( + &self, + view: TYPES::Time, + signature: Signature, + key: TYPES::SignatureKey, + ) { + let response = match bincode::serialize(&make_proposal_req::(view, signature, key)) { + Ok(serialized_msg) => { + async_timeout( + REQUEST_TIMEOUT, + self.network + .request_data::(serialized_msg, &self.leader), + ) + .await + } + Err(e) => { + tracing::error!( + "Failed to serialize outgoing message: this should never happen. Error: {e}" + ); + broadcast_event(None, &self.sender).await; + return; + } + }; + if let Ok(Ok(serialized_response)) = response { + if let Ok(ResponseMessage::Found(msg)) = bincode::deserialize(&serialized_response) { + let SequencingMessage::General(GeneralConsensusMessage::Proposal(prop)) = msg + else { + error!("Requested Proposal but received a non-proposal in response. Response was {:?}", msg); + broadcast_event(None, &self.sender).await; + return; + }; + broadcast_event(Some(prop), &self.sender).await; + } + broadcast_event(None, &self.sender).await; + } else { + broadcast_event(None, &self.sender).await; + } + } +} + /// Wrapper for the info in a VID request struct VidRequest(TYPES::Time, TYPES::SignatureKey); -/// Wrapper for the info in a Proposal fetch request -struct ProposalRequest(TYPES::Time, TYPES::SignatureKey); - impl> DelayedRequester { /// Wait the delay, then try to complete the request. Iterates over peers /// until the request is completed, or the data is no longer needed. - async fn run( - self, - request: RequestKind, - signature: Signature, - pub_key: TYPES::SignatureKey, - ) { + async fn run(self, request: RequestKind, signature: Signature) { match request { RequestKind::Vid(view, key) => { // Do the delay only if primary is up and then start sending @@ -245,28 +331,7 @@ impl> DelayedRequester { } self.do_vid(VidRequest(view, key), signature).await; } - RequestKind::Proposal(view) => { - self.do_proposal(ProposalRequest(view, pub_key), signature) - .await; - } - RequestKind::DaProposal(..) => {} - } - } - /// Handle sending a request for proposal for a view, does - /// not delay - async fn do_proposal(&self, req: ProposalRequest, signature: Signature) { - match bincode::serialize(&make_proposal_req(&req, signature)) { - Ok(serialized_msg) => { - let _ = self - .network - .request_data::(serialized_msg, &self.leader) - .await; - } - Err(e) => { - tracing::error!( - "Failed to serialize outgoing message: this should never happen. Error: {e}" - ); - } + RequestKind::Proposal(..) | RequestKind::DaProposal(..) => {} } } /// Handle sending a VID Share request, runs the loop until the data exists @@ -361,17 +426,18 @@ fn make_vid( /// Build a request for a Proposal fn make_proposal_req( - req: &ProposalRequest, + view: TYPES::Time, signature: Signature, + key: TYPES::SignatureKey, ) -> Message { - let kind = RequestKind::Proposal(req.0); + let kind = RequestKind::Proposal(view); let data_request = DataRequest { - view: req.0, + view, request: kind, signature, }; Message { - sender: req.1.clone(), + sender: key, kind: MessageKind::Data(DataMessage::RequestData(data_request)), } } diff --git a/crates/task-impls/src/response.rs b/crates/task-impls/src/response.rs index db900d9ac9..fa5774a3a4 100644 --- a/crates/task-impls/src/response.rs +++ b/crates/task-impls/src/response.rs @@ -9,7 +9,10 @@ use hotshot_task::dependency::{Dependency, EventDependency}; use hotshot_types::{ consensus::{Consensus, LockedConsensusState}, data::VidDisperseShare, - message::{DaConsensusMessage, DataMessage, Message, MessageKind, Proposal, SequencingMessage}, + message::{ + DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message, MessageKind, Proposal, + SequencingMessage, + }, traits::{ election::Membership, network::{DataRequest, RequestKind, ResponseChannel, ResponseMessage}, @@ -211,12 +214,13 @@ impl NetworkResponseState { self.quorum.has_stake(sender) } /// Lookup the proposal for the view and respond if it's found/not found - #[allow(clippy::no_effect_underscore_binding)] - async fn respond_with_proposal(&self, _view: TYPES::Time) -> ResponseMessage { - // Complete after we are storing our last proposed view: - // https://github.com/EspressoSystems/HotShot/issues/3240 - async {}.await; - todo!(); + async fn respond_with_proposal(&self, view: TYPES::Time) -> ResponseMessage { + match self.consensus.read().await.last_proposals().get(&view) { + Some(prop) => ResponseMessage::Found(SequencingMessage::General( + GeneralConsensusMessage::Proposal(prop.clone()), + )), + None => ResponseMessage::NotFound, + } } } diff --git a/crates/testing/tests/tests_1/upgrade_task.rs b/crates/testing/tests/tests_1/upgrade_task.rs index ea58e3a545..61dd5a37c1 100644 --- a/crates/testing/tests/tests_1/upgrade_task.rs +++ b/crates/testing/tests/tests_1/upgrade_task.rs @@ -366,7 +366,7 @@ async fn test_upgrade_and_consensus_task_blank_blocks() { new_version, decide_by: ViewNumber::new(7), new_version_hash: [0u8; 12].to_vec(), - old_version_last_view: ViewNumber::new(7), + old_version_last_view: ViewNumber::new(6), new_version_first_view: ViewNumber::new(8), }; @@ -504,6 +504,7 @@ async fn test_upgrade_and_consensus_task_blank_blocks() { ViewNumber::new(6), null_block::builder_fee(quorum_membership.total_nodes()).unwrap(), ), + QuorumProposalRecv(proposals[5].clone(), leaders[5]), QcFormed(either::Either::Left(proposals[5].data.justify_qc.clone())), ], vec![ @@ -574,14 +575,24 @@ async fn test_upgrade_and_consensus_task_blank_blocks() { task_state_asserts: vec![], }, Expectations { - output_asserts: vec![quorum_proposal_send_with_null_block( + output_asserts: vec![ + exact(ViewChange(ViewNumber::new(6))), + validated_state_updated(), + quorum_proposal_validated(), + quorum_proposal_send_with_null_block( quorum_membership.total_nodes(), - )], + ), + leaf_decided(), + quorum_vote_send(), + ], task_state_asserts: vec![], }, Expectations { output_asserts: vec![ exact(ViewChange(ViewNumber::new(7))), + validated_state_updated(), + quorum_proposal_validated(), + leaf_decided(), // We do NOT expect a quorum_vote_send() because we have set the block to be non-null in this view. ], task_state_asserts: vec![], diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index e4d31e395a..1add6ba71c 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -239,6 +239,11 @@ impl Consensus { &self.saved_da_certs } + /// Get the map of our recent proposals + pub fn last_proposals(&self) -> &BTreeMap>> { + &self.last_proposals + } + /// Update the current view. /// # Errors /// Can return an error when the new view_number is not higher than the existing view number.