Skip to content

Commit

Permalink
Expose Requesting Proposal (#3748)
Browse files Browse the repository at this point in the history
* expose requesting proposal

* lint

* logging return error from dep

* fix comment

* fix

* fix test/when we store
  • Loading branch information
bfish713 authored Oct 10, 2024
1 parent db78abb commit 92a51a2
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 16 deletions.
85 changes: 81 additions & 4 deletions crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,26 @@

use std::sync::Arc;

use anyhow::{anyhow, Ok, Result};
use async_broadcast::{InactiveReceiver, Receiver, Sender};
use async_lock::RwLock;
use committable::{Commitment, Committable};
use futures::Stream;
use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry, Task, TaskState};
use hotshot_task_impls::events::HotShotEvent;
use hotshot_task::{
dependency::{Dependency, EventDependency},
task::{ConsensusTaskRegistry, NetworkTaskRegistry, Task, TaskState},
};
use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
use hotshot_types::{
consensus::Consensus,
data::Leaf,
data::{Leaf, QuorumProposal},
error::HotShotError,
traits::{election::Membership, network::ConnectedNetwork, node_implementation::NodeType},
request_response::ProposalRequestPayload,
traits::{
consensus_api::ConsensusApi, election::Membership, network::ConnectedNetwork,
node_implementation::NodeType, signature_key::SignatureKey,
},
vote::HasViewNumber,
};
use tracing::instrument;

Expand Down Expand Up @@ -77,6 +87,73 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
self.output_event_stream.1.activate_cloned()
}

/// Request a proposal from the all other nodes. Will block until some node
/// returns a valid proposal with the requested commitment. If nobody has the
/// proposal this will block forever
///
/// # Errors
/// Errors if signing the request for proposal fails
pub async fn request_proposal(
&self,
view: TYPES::Time,
leaf_commitment: Commitment<Leaf<TYPES>>,
) -> Result<QuorumProposal<TYPES>> {
// We need to be able to sign this request before submitting it to the network. Compute the
// payload first.
let signed_proposal_request = ProposalRequestPayload {
view_number: view,
key: self.public_key(),
};

// Finally, compute the signature for the payload.
let signature = TYPES::SignatureKey::sign(
self.private_key(),
signed_proposal_request.commit().as_ref(),
)?;

// First, broadcast that we need a proposal
broadcast_event(
HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
&self.internal_event_stream.0,
)
.await;

let mem = &self.memberships.quorum_membership;
let upgrade_lock = &self.hotshot.upgrade_lock;
loop {
let hs_event = EventDependency::new(
self.internal_event_stream.1.activate_cloned(),
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event {
quorum_proposal.data.view_number() == view
} else {
false
}
}),
)
.completed()
.await
.ok_or(anyhow!("Event dependency failed to get event"))?;

// Then, if it's `Some`, make sure that the data is correct

if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = hs_event.as_ref() {
// Make sure that the quorum_proposal is valid
if let Err(err) = quorum_proposal.validate_signature(mem, upgrade_lock).await {
tracing::warn!("Invalid Proposal Received after Request. Err {:?}", err);
continue;
}
let proposed_leaf = Leaf::from_quorum_proposal(&quorum_proposal.data);
let commit = proposed_leaf.commit(upgrade_lock).await;
if commit == leaf_commitment {
return Ok(quorum_proposal.data.clone());
}
tracing::warn!("Proposal receied from request has different commitment than expected.\nExpected = {:?}\nReceived{:?}", leaf_commitment, commit);
}
}
}

/// HACK so we can know the types when running tests...
/// there are two cleaner solutions:
/// - make the stream generic and in nodetypes or nodeimpelmentation
Expand Down
5 changes: 2 additions & 3 deletions crates/task-impls/src/consensus/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

use core::time::Duration;
use std::{marker::PhantomData, sync::Arc};

use anyhow::{bail, ensure, Context, Result};
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use chrono::Utc;
use core::time::Duration;
use futures::FutureExt;
use hotshot_types::{
consensus::{CommitmentAndMetadata, OuterConsensus, View},
Expand All @@ -33,6 +31,7 @@ use hotshot_types::{
utils::ViewInner,
vote::{Certificate, HasViewNumber},
};
use std::{marker::PhantomData, sync::Arc};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
Expand Down
12 changes: 4 additions & 8 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,13 +492,6 @@ pub async fn validate_proposal_safety_and_liveness<
tracing::debug!("Internal proposal update failed; error = {e:#}");
};

// Update our persistent storage of the proposal. We also itentionally swallow
// this error as it should not affect consensus and would, instead, imply an
// issue on the sequencer side.
if let Err(e) = storage.write().await.append_proposal(&proposal).await {
tracing::debug!("Persisting the proposal update failed; error = {e:#}");
};

// Broadcast that we've updated our consensus state so that other tasks know it's safe to grab.
broadcast_event(
Arc::new(HotShotEvent::ValidatedStateUpdated(view_number, view)),
Expand Down Expand Up @@ -558,8 +551,11 @@ pub async fn validate_proposal_safety_and_liveness<
});
}

// We accept the proposal, notify the application layer
// Update our persistent storage of the proposal. If we cannot store the proposal reutrn
// and error so we don't vote
storage.write().await.append_proposal(&proposal).await?;

// We accept the proposal, notify the application layer
broadcast_event(
Event {
view_number,
Expand Down
1 change: 0 additions & 1 deletion crates/testing/tests/tests_1/consensus_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,6 @@ async fn test_vid_disperse_storage_failure() {
let expectations = vec![Expectations::from_outputs(all_predicates![
validated_state_updated(),
exact(ViewChange(ViewNumber::new(1))),
quorum_proposal_validated(),
])];

let consensus_state =
Expand Down

0 comments on commit 92a51a2

Please sign in to comment.