Skip to content

Commit

Permalink
fix merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Oct 12, 2023
1 parent 3ea56a4 commit c1ee8de
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 284 deletions.
3 changes: 2 additions & 1 deletion crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use hotshot_types::{
election::{ConsensusExchange, Membership},
network::{CommunicationChannel, ConsensusIntentEvent, TransmitType},
node_implementation::{
CommitteeEx, ExchangesType, NodeImplementation, NodeType, QuorumEx, VIDEx, ViewSyncEx, SequencingTimeoutEx
CommitteeEx, ExchangesType, NodeImplementation, NodeType, QuorumEx,
SequencingTimeoutEx, VIDEx, ViewSyncEx,
},
state::ConsensusTime,
},
Expand Down
200 changes: 0 additions & 200 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use hotshot_task::{
task::{FilterEvent, HandleEvent, HotShotTaskCompleted, HotShotTaskTypes, TS},
task_impls::{HSTWithEvent, TaskBuilder},
};
use hotshot_types::traits::election::SignedCertificate;
use hotshot_types::vote::DAVoteAccumulator;
use hotshot_types::{
certificate::DACertificate,
Expand All @@ -29,7 +28,6 @@ use hotshot_types::{
BlockPayload,
},
utils::ViewInner,
vote::{DAVoteAccumulator, VoteType},
};

use snafu::Snafu;
Expand Down Expand Up @@ -188,47 +186,6 @@ where
}
}
}
SequencingHotShotEvent::VidVoteRecv(vote) => {
// TODO copy-pasted from DAVoteRecv https://github.com/EspressoSystems/HotShot/issues/1690

debug!("VID vote recv, collection task {:?}", vote.get_view());
// panic!("Vote handle received DA vote for view {}", *vote.current_view);

let accumulator = state.accumulator.left().unwrap();

match state.committee_exchange.accumulate_vote_2(
accumulator,
&vote,
&vote.block_commitment,
) {
Left(new_accumulator) => {
state.accumulator = either::Left(new_accumulator);
}

Right(vid_cert) => {
debug!("Sending VID cert! {:?}", vid_cert.view_number);
state
.event_stream
.publish(SequencingHotShotEvent::VidCertSend(
vid_cert.clone(),
state.committee_exchange.public_key().clone(),
))
.await;

state.accumulator = Right(vid_cert.clone());
state
.committee_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::CancelPollForVotes(
*vid_cert.view_number,
))
.await;

// Return completed at this point
return (Some(HotShotTaskCompleted::ShutDown), state);
}
}
}
SequencingHotShotEvent::Shutdown => return (Some(HotShotTaskCompleted::ShutDown), state),
_ => {
error!("unexpected event {:?}", event);
Expand Down Expand Up @@ -414,163 +371,6 @@ where
.await;
};
}
SequencingHotShotEvent::VidVoteRecv(vote) => {
// TODO copy-pasted from DAVoteRecv https://github.com/EspressoSystems/HotShot/issues/1690

// warn!(
// "VID vote recv, Main Task {:?}, key: {:?}",
// vote.current_view,
// self.committee_exchange.public_key()
// );
// Check if we are the leader and the vote is from the sender.
let view = vote.current_view;
if !self.committee_exchange.is_leader(view) {
error!(
"We are not the VID leader for view {} are we leader for next view? {}",
*view,
self.committee_exchange.is_leader(view + 1)
);
return None;
}

let handle_event = HandleEvent(Arc::new(move |event, state| {
async move { vote_handle(state, event).await }.boxed()
}));
let collection_view =
if let Some((collection_view, collection_id, _)) = &self.vote_collector {
// TODO: Is this correct for consecutive leaders?
if view > *collection_view {
// warn!("shutting down for view {:?}", collection_view);
self.registry.shutdown_task(*collection_id).await;
}
*collection_view
} else {
TYPES::Time::new(0)
};

let new_accumulator = DAVoteAccumulator {
da_vote_outcomes: HashMap::new(),
success_threshold: self.committee_exchange.success_threshold(),
sig_lists: Vec::new(),
signers: bitvec![0; self.committee_exchange.total_nodes()],
phantom: PhantomData,
};

let accumulator = self.committee_exchange.accumulate_vote_2(
new_accumulator,
&vote,
&vote.clone().block_commitment,
);

if view > collection_view {
let state = DAVoteCollectionTaskState {
committee_exchange: self.committee_exchange.clone(),

accumulator,
cur_view: view,
event_stream: self.event_stream.clone(),
id: self.id,
};
let name = "VID Vote Collection";
let filter = FilterEvent(Arc::new(|event| {
matches!(event, SequencingHotShotEvent::VidVoteRecv(_))
}));
let builder =
TaskBuilder::<DAVoteCollectionTypes<TYPES, I>>::new(name.to_string())
.register_event_stream(self.event_stream.clone(), filter)
.await
.register_registry(&mut self.registry.clone())
.await
.register_state(state)
.register_event_handler(handle_event);
let id = builder.get_task_id().unwrap();
let stream_id = builder.get_stream_id().unwrap();
let _task =
async_spawn(
async move { DAVoteCollectionTypes::build(builder).launch().await },
);
self.vote_collector = Some((view, id, stream_id));
} else if let Some((_, _, stream_id)) = self.vote_collector {
self.event_stream
.direct_message(stream_id, SequencingHotShotEvent::VidVoteRecv(vote))
.await;
};
}
SequencingHotShotEvent::VidDisperseRecv(disperse, sender) => {
// TODO copy-pasted from DAProposalRecv https://github.com/EspressoSystems/HotShot/issues/1690
debug!(
"VID disperse received for view: {:?}",
disperse.data.get_view_number()
);

// ED NOTE: Assuming that the next view leader is the one who sends DA proposal for this view
let view = disperse.data.get_view_number();

// Allow a DA proposal that is one view older, in case we have voted on a quorum
// proposal and updated the view.
// `self.cur_view` should be at least 1 since there is a view change before getting
// the `DAProposalRecv` event. Otherewise, the view number subtraction below will
// cause an overflow error.
if view < self.cur_view - 1 {
warn!("Throwing away VID disperse data that is more than one view older");
return None;
}

debug!("VID disperse data is fresh.");
let block_commitment = disperse.data.commitment;

// ED Is this the right leader?
let view_leader_key = self.committee_exchange.get_leader(view);
if view_leader_key != sender {
error!("VID proposal doesn't have expected leader key for view {} \n DA proposal is: [N/A for VID]", *view);
return None;
}

if !view_leader_key.validate(&disperse.signature, block_commitment.as_ref()) {
error!("Could not verify VID proposal sig.");
return None;
}

let vote_token = self.committee_exchange.make_vote_token(view);
match vote_token {
Err(e) => {
error!("Failed to generate vote token for {:?} {:?}", view, e);
}
Ok(None) => {
debug!("We were not chosen for VID quorum on {:?}", view);
}
Ok(Some(vote_token)) => {
// Generate and send vote
let vote = self.committee_exchange.create_vid_message(
block_commitment,
view,
vote_token,
);

// ED Don't think this is necessary?
// self.cur_view = view;

debug!("Sending vote to the VID leader {:?}", vote.current_view);
self.event_stream
.publish(SequencingHotShotEvent::VidVoteSend(vote))
.await;
let mut consensus = self.consensus.write().await;

// Ensure this view is in the view map for garbage collection, but do not overwrite if
// there is already a view there: the replica task may have inserted a `Leaf` view which
// contains strictly more information.
consensus.state_map.entry(view).or_insert(View {
view_inner: ViewInner::DA {
block: block_commitment,
},
});

// Record the block we have promised to make available.
// TODO https://github.com/EspressoSystems/HotShot/issues/1692
// consensus.saved_blocks.insert(proposal.data.deltas);
}
}
}
SequencingHotShotEvent::ViewChange(view) => {
if *self.cur_view >= *view {
return None;
Expand Down
4 changes: 2 additions & 2 deletions crates/task-impls/src/vid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ where

match state
.vid_exchange
.accumulate_vote_2(accumulator, &vote, &vote.block_commitment)
.accumulate_vote(accumulator, &vote, &vote.block_commitment)
{
Left(new_accumulator) => {
state.accumulator = either::Left(new_accumulator);
Expand Down Expand Up @@ -249,7 +249,7 @@ where
phantom: PhantomData,
};

let accumulator = self.vid_exchange.accumulate_vote_2(
let accumulator = self.vid_exchange.accumulate_vote(
new_accumulator,
&vote,
&vote.clone().block_commitment,
Expand Down
74 changes: 1 addition & 73 deletions crates/types/src/traits/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,22 +497,6 @@ pub trait CommitteeExchangeType<TYPES: NodeType, M: NetworkMsg>:
current_view: TYPES::Time,
vote_token: TYPES::VoteTokenType,
) -> DAVote<TYPES>;

// TODO temporary vid methods, move to quorum https://github.com/EspressoSystems/HotShot/issues/1696

/// Create a message with a vote on VID disperse data.
fn create_vid_message(
&self,
block_commitment: Commitment<TYPES::BlockType>,
current_view: TYPES::Time,
vote_token: TYPES::VoteTokenType,
) -> DAVote<TYPES>;

/// Sign a vote on VID proposal.
fn sign_vid_vote(
&self,
block_commitment: Commitment<TYPES::BlockType>,
) -> (EncodedPublicKey, EncodedSignature);
}

/// Standard implementation of [`CommitteeExchangeType`] utilizing a DA committee.
Expand Down Expand Up @@ -584,33 +568,6 @@ impl<
vote_data: VoteData::DA(block_commitment),
}
}

fn create_vid_message(
&self,
block_commitment: Commitment<TYPES::BlockType>,
current_view: <TYPES as NodeType>::Time,
vote_token: <TYPES as NodeType>::VoteTokenType,
) -> DAVote<TYPES> {
let signature = self.sign_vid_vote(block_commitment);
DAVote {
signature,
block_commitment,
current_view,
vote_token,
vote_data: VoteData::DA(block_commitment),
}
}

fn sign_vid_vote(
&self,
block_commitment: Commitment<<TYPES as NodeType>::BlockType>,
) -> (EncodedPublicKey, EncodedSignature) {
let signature = TYPES::SignatureKey::sign(
&self.private_key,
VoteData::DA(block_commitment).commit().as_ref(),
);
(self.public_key.to_bytes(), signature)
}
}

impl<
Expand Down Expand Up @@ -685,7 +642,7 @@ pub trait VIDExchangeType<TYPES: NodeType, M: NetworkMsg>: ConsensusExchange<TYP
) -> (EncodedPublicKey, EncodedSignature);
}

/// Standard implementation of [`VIDExchangeType`] utilizing a DA committee.
/// Standard implementation of [`VIDExchangeType`]
#[derive(Derivative)]
#[derivative(Clone, Debug)]
pub struct VIDExchange<
Expand Down Expand Up @@ -788,35 +745,6 @@ impl<
.make_vote_token(view_number, &self.private_key)
}

fn vote_data(&self, commit: Self::Commitment) -> VoteData<Self::Commitment> {
VoteData::DA(commit)
}

/// Add a vote to the accumulating signature. Return The certificate if the vote
/// brings us over the threshould, Else return the accumulator.
fn accumulate_vote(
&self,
encoded_key: &EncodedPublicKey,
encoded_signature: &EncodedSignature,
leaf_commitment: Self::Commitment,
vote_data: VoteData<Self::Commitment>,
vote_token: TYPES::VoteTokenType,
view_number: TYPES::Time,
accumlator: VoteAccumulator<TYPES::VoteTokenType, Self::Commitment, TYPES>,
_relay: Option<u64>,
) -> Either<VoteAccumulator<TYPES::VoteTokenType, Self::Commitment, TYPES>, Self::Certificate>
{
let meta = VoteMetaData {
encoded_key: encoded_key.clone(),
encoded_signature: encoded_signature.clone(),
commitment: leaf_commitment,
data: vote_data,
vote_token,
view_number,
relay: None,
};
self.accumulate_internal(meta, accumlator)
}
fn membership(&self) -> &Self::Membership {
&self.membership
}
Expand Down
Loading

0 comments on commit c1ee8de

Please sign in to comment.