Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick webserver catchup endpoint #1907

Merged
merged 4 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
while self.running.load(Ordering::Relaxed) {
let endpoint = match message_purpose {
MessagePurpose::Proposal => config::get_proposal_route(view_number),
MessagePurpose::CurrentProposal => config::get_recent_proposal_route(),
MessagePurpose::Vote => config::get_vote_route(view_number, vote_index),
MessagePurpose::Data => config::get_transactions_route(tx_index),
MessagePurpose::Internal => unimplemented!(),
Expand Down Expand Up @@ -221,6 +222,15 @@ impl<M: NetworkMsg, KEY: SignatureKey, TYPES: NodeType> Inner<M, KEY, TYPES> {
// }
// }
}
MessagePurpose::CurrentProposal => {
// Only pushing the first proposal since we will soon only be allowing 1 proposal per view
self.broadcast_poll_queue
.write()
.await
.push(deserialized_messages[0].clone());

return Ok(());
}
MessagePurpose::Vote => {
// error!(
// "Received {} votes from web server for view {} is da {}",
Expand Down Expand Up @@ -500,7 +510,9 @@ impl<
MessagePurpose::Proposal => config::post_proposal_route(*view_number),
MessagePurpose::Vote => config::post_vote_route(*view_number),
MessagePurpose::Data => config::post_transactions_route(),
MessagePurpose::Internal => return Err(WebServerNetworkError::EndpointError),
MessagePurpose::Internal | MessagePurpose::CurrentProposal => {
return Err(WebServerNetworkError::EndpointError)
}
MessagePurpose::ViewSyncProposal => {
// error!("Posting view sync proposal route is: {}", config::post_view_sync_proposal_route(*view_number));
config::post_view_sync_proposal_route(*view_number)
Expand Down Expand Up @@ -783,6 +795,25 @@ impl<
.await;
}
}
ConsensusIntentEvent::PollForCurrentProposal => {
// create new task
let (_, receiver) = unbounded();

async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::CurrentProposal, 1)
.await
{
error!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
}
});
}
ConsensusIntentEvent::PollForVotes(view_number) => {
let mut task_map = self.inner.vote_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down
7 changes: 6 additions & 1 deletion crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,12 @@ where
// }
self.cur_view = new_view;
self.current_proposal = None;

if new_view == TYPES::Time::new(1) {
self.quorum_exchange
.network()
.inject_consensus_info(ConsensusIntentEvent::PollForCurrentProposal)
.await;
}
// Start polling for proposals for the new view
self.quorum_exchange
.network()
Expand Down
3 changes: 2 additions & 1 deletion crates/testing/src/spinning_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ impl SpinningTaskDescription {
state.late_start.remove(&idx.try_into().unwrap())
{
tracing::error!("Spinning up node late");
node.run_tasks().await;
let handle = node.run_tasks().await;
handle.hotshot.start_consensus().await;
}
}
UpDown::Down => {
Expand Down
5 changes: 0 additions & 5 deletions crates/testing/src/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ where
}
}
}
assert!(
late_start_nodes.len()
<= self.launcher.metadata.total_nodes - self.launcher.metadata.start_nodes,
"Test wants to late start too many nodes."
);

self.add_nodes(self.launcher.metadata.total_nodes, &late_start_nodes)
.await;
Expand Down
65 changes: 62 additions & 3 deletions crates/testing/tests/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn test_catchup() {
metadata.total_nodes = 20;

metadata.spinning_properties = SpinningTaskDescription {
node_changes: vec![(Duration::new(1, 0), catchup_nodes)],
node_changes: vec![(Duration::from_millis(1400), catchup_nodes)],
};

metadata.completion_task_description =
Expand All @@ -51,6 +51,7 @@ async fn test_catchup() {
check_leaf: true,
..Default::default()
};
metadata.overall_safety_properties.num_failed_views = 2;

metadata
.gen_launcher::<SequencingTestTypes, SequencingMemoryImpl>()
Expand All @@ -59,6 +60,64 @@ async fn test_catchup() {
.await;
}

#[cfg(test)]
#[cfg_attr(
async_executor_impl = "tokio",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
async fn test_catchup_web() {
use std::time::Duration;

use hotshot_testing::{
completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription},
node_types::{SequencingTestTypes, SequencingWebImpl},
overall_safety_task::OverallSafetyPropertiesDescription,
spinning_task::{ChangeNode, SpinningTaskDescription, UpDown},
test_builder::{TestMetadata, TimingData},
};

async_compatibility_layer::logging::setup_logging();
async_compatibility_layer::logging::setup_backtrace();
let timing_data = TimingData {
next_view_timeout: 1000,
..Default::default()
};
let mut metadata = TestMetadata::default();
let catchup_nodes = vec![ChangeNode {
idx: 18,
updown: UpDown::Up,
}];

metadata.timing_data = timing_data;
metadata.start_nodes = 20;
metadata.total_nodes = 20;

metadata.spinning_properties = SpinningTaskDescription {
node_changes: vec![(Duration::from_millis(2500), catchup_nodes)],
};

metadata.completion_task_description =
CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
TimeBasedCompletionTaskDescription {
duration: Duration::from_millis(100000),
},
);
metadata.overall_safety_properties = OverallSafetyPropertiesDescription {
check_leaf: true,
..Default::default()
};

// only alow for the view which the catchup node hasn't started to fail
metadata.overall_safety_properties.num_failed_views = 1;

metadata
.gen_launcher::<SequencingTestTypes, SequencingWebImpl>()
.launch()
.run_test()
.await;
}

/// Test that one node catches up and has sucessful views after coming back
#[cfg(test)]
#[cfg_attr(
Expand Down Expand Up @@ -94,13 +153,13 @@ async fn test_catchup_one_node() {
metadata.total_nodes = 20;

metadata.spinning_properties = SpinningTaskDescription {
node_changes: vec![(Duration::new(1, 0), catchup_nodes)],
node_changes: vec![(Duration::from_millis(400), catchup_nodes)],
};

metadata.completion_task_description =
CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
TimeBasedCompletionTaskDescription {
duration: Duration::from_millis(100000),
duration: Duration::from_millis(20000),
},
);
metadata.overall_safety_properties = OverallSafetyPropertiesDescription {
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/tests/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn test_timeout() {
metadata.timing_data = timing_data;

metadata.spinning_properties = SpinningTaskDescription {
node_changes: vec![(Duration::new(1, 0), dead_nodes)],
node_changes: vec![(Duration::new(0, 5000), dead_nodes)],
};

// TODO ED Add safety task, etc to confirm TCs are being formed
Expand Down
2 changes: 2 additions & 0 deletions crates/types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub struct Messages<TYPES: NodeType, I: NodeImplementation<TYPES>>(pub Vec<Messa
pub enum MessagePurpose {
/// Message with a quorum proposal.
Proposal,
/// Message with most recent proposal the server has
CurrentProposal,
/// Message with a quorum vote.
Vote,
/// Message with a view sync vote.
Expand Down
3 changes: 3 additions & 0 deletions crates/types/src/traits/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ pub enum ConsensusIntentEvent {
PollForVotes(u64),
/// Poll for a proposal for a particular view
PollForProposal(u64),
/// Poll for the most recent proposal the webserver has
PollForCurrentProposal,
/// Poll for a DAC for a particular view
PollForDAC(u64),
/// Poll for view sync votes starting at a particular view
Expand Down Expand Up @@ -174,6 +176,7 @@ impl ConsensusIntentEvent {
| ConsensusIntentEvent::PollForViewSyncCertificate(view_number)
| ConsensusIntentEvent::PollForTransactions(view_number)
| ConsensusIntentEvent::CancelPollForTransactions(view_number) => *view_number,
ConsensusIntentEvent::PollForCurrentProposal => 1,
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/web_server/api.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ DOC = """
Return the proposal for a given view number
"""

# GET the proposal for a view, where the view is passed as an argument
[route.getrecentproposal]
PATH = ["proposal/"]
DOC = """
Return the proposal for the most recent view the server has
"""

# POST a proposal, where the view is passed as an argument
[route.postproposal]
PATH = ["proposal/:view_number"]
Expand Down
4 changes: 4 additions & 0 deletions crates/web_server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ pub fn post_proposal_route(view_number: u64) -> String {
format!("api/proposal/{view_number}")
}

pub fn get_recent_proposal_route() -> String {
"api/proposal".to_string()
}

pub fn get_da_certificate_route(view_number: u64) -> String {
format!("api/certificate/{view_number}")
}
Expand Down
15 changes: 15 additions & 0 deletions crates/web_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct WebServerState<KEY> {
da_certificates: HashMap<u64, (String, Vec<u8>)>,
/// view for oldest proposals in memory
oldest_proposal: u64,
/// view for the most recent proposal to help nodes catchup
recent_proposal: u64,
/// view for teh oldest DA certificate
oldest_certificate: u64,

Expand Down Expand Up @@ -74,6 +76,7 @@ impl<KEY: SignatureKey + 'static> WebServerState<KEY> {
num_txns: 0,
oldest_vote: 0,
oldest_proposal: 0,
recent_proposal: 0,
oldest_certificate: 0,
shutdown: None,
stake_table: Vec::new(),
Expand Down Expand Up @@ -101,6 +104,7 @@ impl<KEY: SignatureKey + 'static> WebServerState<KEY> {
/// Trait defining methods needed for the `WebServerState`
pub trait WebServerDataSource<KEY> {
fn get_proposal(&self, view_number: u64) -> Result<Option<Vec<Vec<u8>>>, Error>;
fn get_recent_proposal(&self) -> Result<Option<Vec<Vec<u8>>>, Error>;
fn get_view_sync_proposal(
&self,
view_number: u64,
Expand Down Expand Up @@ -156,6 +160,10 @@ impl<KEY: SignatureKey> WebServerDataSource<KEY> for WebServerState<KEY> {
}
}

fn get_recent_proposal(&self) -> Result<Option<Vec<Vec<u8>>>, Error> {
self.get_proposal(self.recent_proposal)
}

fn get_view_sync_proposal(
&self,
view_number: u64,
Expand Down Expand Up @@ -316,6 +324,10 @@ impl<KEY: SignatureKey> WebServerDataSource<KEY> for WebServerState<KEY> {
fn post_proposal(&mut self, view_number: u64, mut proposal: Vec<u8>) -> Result<(), Error> {
debug!("Received proposal for view {}", view_number);

if view_number > self.recent_proposal {
self.recent_proposal = view_number;
}

// Only keep proposal history for MAX_VIEWS number of view
if self.proposals.len() >= MAX_VIEWS {
self.proposals.remove(&self.oldest_proposal);
Expand Down Expand Up @@ -495,6 +507,9 @@ where
}
.boxed()
})?
.get("getrecentproposal", |_req, state| {
async move { state.get_recent_proposal() }.boxed()
})?
.get("getviewsyncproposal", |req, state| {
async move {
let view_number: u64 = req.integer_param("view_number")?;
Expand Down