Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit test for DA task #1590

Merged
merged 7 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ test_with_failures:
echo Testing nodes leaving the network with async std executor
RUST_LOG="" ASYNC_STD_THREAD_COUNT=1 cargo test --features=full-ci --lib --bins --tests --benches --workspace --no-fail-fast test_with_failures -- --test-threads=1 --nocapture

test_da_task:
echo Testing the DA task with async std executor
RUST_LOG="" ASYNC_STD_THREAD_COUNT=1 cargo test --features=full-ci --lib --bins --tests --benches --workspace --no-fail-fast test_da_task -- --test-threads=1 --nocapture

test_pkg := "hotshot"

test_name := "sequencing_libp2p_test"
Expand Down
3 changes: 3 additions & 0 deletions task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ where

// Allow a DA proposal that is one view older, in case we have voted on a quorum
// proposal and updated the view.
// `self.cur_view` should be at least 1 since there is a view change before getting
shenkeyao marked this conversation as resolved.
Show resolved Hide resolved
// the `DAProposalRecv` event. Otherewise, the view number subtraction below will
// cause an overflow error.
if view < self.cur_view - 1 {
warn!("Throwing away DA proposal that is more than one view older");
return None;
Expand Down
40 changes: 22 additions & 18 deletions task-impls/src/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ use hotshot_task::{
task_impls::{HSTWithEvent, TaskBuilder},
task_launcher::TaskRunner,
};

use hotshot_types::traits::node_implementation::{NodeImplementation, NodeType};
use snafu::Snafu;
use std::collections::HashSet;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use tracing::error;

pub struct TestHarnessState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
expected_output: HashSet<SequencingHotShotEvent<TYPES, I>>,
expected_output: HashMap<SequencingHotShotEvent<TYPES, I>, usize>,
}

pub struct EventBundle<TYPES: NodeType, I: NodeImplementation<TYPES>>(
Expand Down Expand Up @@ -44,11 +45,13 @@ pub type TestHarnessTaskTypes<TYPES, I> = HSTWithEvent<
TestHarnessState<TYPES, I>,
>;

pub async fn run_harness<TYPES: NodeType, I: NodeImplementation<TYPES>>(
pub async fn run_harness<TYPES: NodeType, I: NodeImplementation<TYPES>, Fut>(
input: Vec<SequencingHotShotEvent<TYPES, I>>,
expected_output: HashSet<SequencingHotShotEvent<TYPES, I>>,
build_fn: fn(TaskRunner, ChannelStream<SequencingHotShotEvent<TYPES, I>>) -> TaskRunner,
) {
expected_output: HashMap<SequencingHotShotEvent<TYPES, I>, usize>,
build_fn: impl FnOnce(TaskRunner, ChannelStream<SequencingHotShotEvent<TYPES, I>>) -> Fut,
) where
Fut: Future<Output = TaskRunner>,
{
let task_runner = TaskRunner::new();
let registry = task_runner.registry.clone();
let event_stream = event_stream::ChannelStream::new();
Expand All @@ -70,19 +73,14 @@ pub async fn run_harness<TYPES: NodeType, I: NodeImplementation<TYPES>>(
let task = TestHarnessTaskTypes::build(builder).launch();

let task_runner = task_runner.add_task(id, "test_harness".to_string(), task);
let task_runner = build_fn(task_runner, event_stream.clone());
let task_runner = build_fn(task_runner, event_stream.clone()).await;

let _runner = async_spawn(async move { task_runner.launch().await });
let runner = async_spawn(async move { task_runner.launch().await });

for event in input {
let _ = event_stream.publish(event).await;
}
// TODO fix type weirdness btwn tokio and async-std
todo!();

// for (_task_name, result) in runner.await.into_iter() {
// assert!(matches!(result, HotShotTaskCompleted::ShutDown));
// }
let _ = runner.await;
}

pub fn handle_event<TYPES: NodeType, I: NodeImplementation<TYPES>>(
Expand All @@ -92,10 +90,16 @@ pub fn handle_event<TYPES: NodeType, I: NodeImplementation<TYPES>>(
std::option::Option<HotShotTaskCompleted>,
TestHarnessState<TYPES, I>,
) {
if !state.expected_output.contains(&event) {
panic!("Got and unexpected event: {:?}", event);
error!("got event: {:?}", event);
if !state.expected_output.contains_key(&event) {
panic!("Got an unexpected event: {:?}", event);
}
let num_expected = state.expected_output.get_mut(&event).unwrap();
if *num_expected == 1 {
state.expected_output.remove(&event);
} else {
*num_expected -= 1;
}
state.expected_output.remove(&event);

if state.expected_output.is_empty() {
return (Some(HotShotTaskCompleted::ShutDown), state);
Expand Down
9 changes: 5 additions & 4 deletions testing/src/node_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ pub struct SequencingWebImpl;
#[derive(Clone, Debug, Deserialize, Serialize, Hash, Eq, PartialEq)]
pub struct StaticFallbackImpl;

type StaticMembership = StaticCommittee<SequencingTestTypes, SequencingLeaf<SequencingTestTypes>>;
pub type StaticMembership =
StaticCommittee<SequencingTestTypes, SequencingLeaf<SequencingTestTypes>>;

type StaticMemoryDAComm = MemoryCommChannel<
pub type StaticMemoryDAComm = MemoryCommChannel<
SequencingTestTypes,
SequencingMemoryImpl,
DAProposal<SequencingTestTypes>,
Expand All @@ -103,7 +104,7 @@ type StaticWebDAComm = WebCommChannel<
type StaticFallbackComm =
WebServerWithFallbackCommChannel<SequencingTestTypes, StaticFallbackImpl, StaticMembership>;

type StaticMemoryQuorumComm = MemoryCommChannel<
pub type StaticMemoryQuorumComm = MemoryCommChannel<
SequencingTestTypes,
SequencingMemoryImpl,
QuorumProposal<SequencingTestTypes, SequencingLeaf<SequencingTestTypes>>,
Expand All @@ -127,7 +128,7 @@ type StaticWebQuorumComm = WebCommChannel<
StaticMembership,
>;

type StaticMemoryViewSyncComm = MemoryCommChannel<
pub type StaticMemoryViewSyncComm = MemoryCommChannel<
SequencingTestTypes,
SequencingMemoryImpl,
ViewSyncCertificate<SequencingTestTypes>,
Expand Down
2 changes: 1 addition & 1 deletion testing/tests/consensus_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// use hotshot::traits::election::static_committee::StaticElectionConfig;
// use hotshot::traits::election::static_committee::StaticVoteToken;
// use hotshot::traits::election::vrf::JfPubKey;
// use hotshot::traits::implementations::MemoryCommChannel;
// use hotshot::traits::implementations::MemoryCommChannefl;
shenkeyao marked this conversation as resolved.
Show resolved Hide resolved
// use hotshot::traits::implementations::MemoryStorage;
// use hotshot::traits::Block;
// use hotshot::types::SignatureKey;
Expand Down
233 changes: 233 additions & 0 deletions testing/tests/da_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
use commit::Committable;
use hotshot::rand::SeedableRng;
use hotshot::traits::election::static_committee::GeneralStaticCommittee;
use hotshot::traits::election::static_committee::StaticElectionConfig;
use hotshot::traits::election::vrf::JfPubKey;
use hotshot::traits::implementations::MemoryStorage;
use hotshot::types::SignatureKey;
use hotshot::types::SystemContextHandle;
use hotshot::HotShotInitializer;
use hotshot::HotShotSequencingConsensusApi;
use hotshot::{certificate::QuorumCertificate, traits::TestableNodeImplementation, SystemContext};
use hotshot_consensus::traits::ConsensusSharedApi;
use hotshot_task_impls::events::SequencingHotShotEvent;
use hotshot_testing::node_types::SequencingMemoryImpl;
use hotshot_testing::node_types::SequencingTestTypes;
use hotshot_testing::node_types::{
StaticMembership, StaticMemoryDAComm, StaticMemoryQuorumComm, StaticMemoryViewSyncComm,
};
use hotshot_testing::test_builder::TestMetadata;
use hotshot_types::certificate::ViewSyncCertificate;
use hotshot_types::data::DAProposal;
use hotshot_types::data::QuorumProposal;
use hotshot_types::data::SequencingLeaf;
use hotshot_types::data::ViewNumber;
use hotshot_types::message::Message;
use hotshot_types::message::SequencingMessage;
use hotshot_types::traits::election::Membership;
use hotshot_types::traits::metrics::NoMetrics;
use hotshot_types::traits::node_implementation::CommitteeEx;
use hotshot_types::traits::node_implementation::ExchangesType;
use hotshot_types::traits::node_implementation::QuorumEx;
use hotshot_types::traits::node_implementation::SequencingQuorumEx;
use hotshot_types::traits::node_implementation::ViewSyncEx;
use hotshot_types::traits::{
election::ConsensusExchange, node_implementation::NodeType, state::ConsensusTime,
};
use hotshot_types::{certificate::DACertificate, vote::ViewSyncData};
use jf_primitives::signatures::BLSSignatureScheme;
use std::collections::HashMap;

// TODO (Keyao) This is the same as `build_consensus_api`. We should move it to a separate file.
async fn build_da_api<
TYPES: NodeType<
ElectionConfigType = StaticElectionConfig,
SignatureKey = JfPubKey<BLSSignatureScheme>,
Time = ViewNumber,
>,
I: TestableNodeImplementation<
TYPES,
Leaf = SequencingLeaf<TYPES>,
ConsensusMessage = SequencingMessage<TYPES, I>,
Storage = MemoryStorage<SequencingTestTypes, SequencingLeaf<SequencingTestTypes>>,
>,
>(
node_id: u64,
) -> SystemContextHandle<TYPES, I>
where
I::Exchanges: ExchangesType<
TYPES,
I::Leaf,
Message<TYPES, I>,
ElectionConfigs = (StaticElectionConfig, StaticElectionConfig),
>,
SequencingQuorumEx<TYPES, I>: ConsensusExchange<
TYPES,
Message<TYPES, I>,
Proposal = QuorumProposal<TYPES, SequencingLeaf<TYPES>>,
Certificate = QuorumCertificate<TYPES, SequencingLeaf<TYPES>>,
Commitment = SequencingLeaf<TYPES>,
Membership = StaticMembership,
Networking = StaticMemoryQuorumComm,
>,
CommitteeEx<TYPES, I>: ConsensusExchange<
TYPES,
Message<TYPES, I>,
Proposal = DAProposal<TYPES>,
Certificate = DACertificate<TYPES>,
Commitment = TYPES::BlockType,
Membership = StaticMembership,
Networking = StaticMemoryDAComm,
>,
ViewSyncEx<TYPES, I>: ConsensusExchange<
TYPES,
Message<TYPES, I>,
Proposal = ViewSyncCertificate<TYPES>,
Certificate = ViewSyncCertificate<TYPES>,
Commitment = ViewSyncData<TYPES>,
Membership = StaticMembership,
Networking = StaticMemoryViewSyncComm,
>,
// Why do we need this?
GeneralStaticCommittee<
SequencingTestTypes,
SequencingLeaf<SequencingTestTypes>,
JfPubKey<BLSSignatureScheme>,
>: Membership<TYPES>,
{
let builder = TestMetadata::default_multiple_rounds();

let launcher = builder.gen_launcher::<SequencingTestTypes, SequencingMemoryImpl>();

let networks = (launcher.resource_generator.channel_generator)(node_id);
let storage = (launcher.resource_generator.storage)(node_id);
let config = launcher.resource_generator.config.clone();
let initializer =
HotShotInitializer::<TYPES, I::Leaf>::from_genesis(I::block_genesis()).unwrap();

let known_nodes = config.known_nodes.clone();
let private_key = I::generate_test_key(node_id);
let public_key = TYPES::SignatureKey::from_private(&private_key);
let ek =
jf_primitives::aead::KeyPair::generate(&mut rand_chacha::ChaChaRng::from_seed([0u8; 32]));
let quorum_election_config = config.election_config.clone().unwrap_or_else(|| {
<QuorumEx<TYPES,I> as ConsensusExchange<
TYPES,
Message<TYPES, I>,
>>::Membership::default_election_config(config.total_nodes.get() as u64)
});

let committee_election_config = config.election_config.clone().unwrap_or_else(|| {
<CommitteeEx<TYPES,I> as ConsensusExchange<
TYPES,
Message<TYPES, I>,
>>::Membership::default_election_config(config.total_nodes.get() as u64)
});
let exchanges = I::Exchanges::create(
known_nodes.clone(),
(quorum_election_config, committee_election_config),
networks,
public_key.clone(),
private_key.clone(),
ek.clone(),
);
SystemContext::init(
public_key,
private_key,
node_id,
config,
storage,
exchanges,
initializer,
NoMetrics::boxed(),
)
.await
.expect("Could not init hotshot")
}

#[cfg(test)]
#[cfg_attr(
feature = "tokio-executor",
tokio::test(flavor = "multi_thread", worker_threads = 2)
)]
#[cfg_attr(feature = "async-std-executor", async_std::test)]
async fn test_da_task() {
use hotshot::{
demos::sdemo::{SDemoBlock, SDemoNormalBlock},
tasks::add_da_task,
};
use hotshot_task_impls::harness::run_harness;
use hotshot_types::{
message::{CommitteeConsensusMessage, Proposal},
traits::election::CommitteeExchangeType,
};

async_compatibility_layer::logging::setup_logging();
async_compatibility_layer::logging::setup_backtrace();

// Build the API for node 2.
let handle = build_da_api::<
hotshot_testing::node_types::SequencingTestTypes,
hotshot_testing::node_types::SequencingMemoryImpl,
>(2)
.await;
let api: HotShotSequencingConsensusApi<SequencingTestTypes, SequencingMemoryImpl> =
HotShotSequencingConsensusApi {
inner: handle.hotshot.inner.clone(),
};
let committee_exchange = api.inner.exchanges.committee_exchange().clone();
let pub_key = api.public_key().clone();
let block = SDemoBlock::Normal(SDemoNormalBlock {
previous_state: (),
transactions: Vec::new(),
});
let block_commitment = block.commit();
let signature = committee_exchange.sign_da_proposal(&block_commitment);
let proposal = DAProposal {
deltas: block.clone(),
view_number: ViewNumber::new(2),
};
let message = Proposal {
data: proposal,
signature,
};

// Every event input is seen on the event stream in the output.
let mut input = Vec::new();
let mut output = HashMap::new();

// In view 1, node 2 is the next leader.
input.push(SequencingHotShotEvent::ViewChange(ViewNumber::new(1)));
input.push(SequencingHotShotEvent::ViewChange(ViewNumber::new(2)));
input.push(SequencingHotShotEvent::DAProposalRecv(
message.clone(),
pub_key.clone(),
));
input.push(SequencingHotShotEvent::Shutdown);

output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(1)), 1);
output.insert(SequencingHotShotEvent::SendDABlockData(block), 1);
output.insert(
SequencingHotShotEvent::DAProposalSend(message.clone(), pub_key.clone()),
1,
);
if let Ok(Some(vote_token)) = committee_exchange.make_vote_token(ViewNumber::new(2)) {
let da_message =
committee_exchange.create_da_message(block_commitment, ViewNumber::new(2), vote_token);
if let CommitteeConsensusMessage::DAVote(vote) = da_message {
output.insert(SequencingHotShotEvent::DAVoteSend(vote), 1);
}
}
output.insert(
SequencingHotShotEvent::DAProposalRecv(message, pub_key.clone()),
1,
);
output.insert(SequencingHotShotEvent::ViewChange(ViewNumber::new(2)), 1);
output.insert(SequencingHotShotEvent::Shutdown, 1);

let build_fn = |task_runner, event_stream| {
add_da_task(task_runner, event_stream, committee_exchange, handle)
};

run_harness(input, output, build_fn).await;
}
Loading