Skip to content

Commit

Permalink
Part of the unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoffranca committed Oct 29, 2024
1 parent 83df8bf commit 1daec32
Show file tree
Hide file tree
Showing 14 changed files with 1,637 additions and 1,564 deletions.
2 changes: 1 addition & 1 deletion node/actors/bft/src/chonky_bft/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) enum Error {
/// Signer of the message.
signer: Box<validator::PublicKey>,
},
/// Past view or phase.
/// Past view.
#[error("past view (current view: {current_view:?})")]
Old {
/// Current view.
Expand Down
4 changes: 3 additions & 1 deletion node/actors/bft/src/chonky_bft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub(crate) mod proposal;
pub(crate) mod proposer;
pub(crate) mod timeout;

#[cfg(test)]
mod testonly;
#[cfg(test)]
mod tests;

Expand All @@ -32,7 +34,7 @@ pub(crate) struct StateMachine {
/// Pipe through which replica sends network messages.
pub(super) outbound_pipe: OutputSender,
/// Pipe through which replica receives network requests.
inbound_pipe: sync::prunable_mpsc::Receiver<ConsensusReq>,
pub(crate) inbound_pipe: sync::prunable_mpsc::Receiver<ConsensusReq>,
/// The sender part of the justification watch. This is used to set the justification
/// and notify the proposer loop.
pub(crate) justification_watch: sync::watch::Sender<Option<validator::ProposalJustification>>,
Expand Down
26 changes: 21 additions & 5 deletions node/actors/bft/src/chonky_bft/proposer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use crate::{metrics, Config, OutputSender};
use std::sync::Arc;
use zksync_concurrency::{ctx, error::Wrap as _, sync};
use zksync_concurrency::{ctx, error::Wrap as _, sync, time};
use zksync_consensus_network::io::ConsensusInputMessage;
use zksync_consensus_roles::validator;

/// Timeout for creating a proposal. If the proposal is not created in this time, the proposer
/// will quit trying to create a proposal for this view. This can be different from the replica
/// timeout for the whole view.
pub(crate) const PROPOSAL_CREATION_TIMEOUT: time::Duration = time::Duration::milliseconds(2000);

/// The proposer loop is responsible for proposing new blocks to the network. It watches for new
/// justifications from the replica and if it is the leader for the view, it proposes a new block.
pub(crate) async fn run_proposer(
Expand All @@ -13,6 +18,7 @@ pub(crate) async fn run_proposer(
mut justification_watch: sync::watch::Receiver<Option<validator::ProposalJustification>>,
) -> ctx::Result<()> {
loop {
// Wait for a new justification to be available.
let Some(justification) = sync::changed(ctx, &mut justification_watch).await?.clone()
else {
continue;
Expand All @@ -23,7 +29,20 @@ pub(crate) async fn run_proposer(
continue;
}

let proposal = create_proposal(ctx, cfg.clone(), justification).await?;
// Create a proposal for the given justification, within the timeout.
let proposal = match create_proposal(
&ctx.with_timeout(PROPOSAL_CREATION_TIMEOUT),
cfg.clone(),
justification,
)
.await
{
Ok(proposal) => proposal,
Err(err) => {
tracing::error!("failed to create proposal: {}", err);
continue;
}
};

// Broadcast our proposal to all replicas (ourselves included).
let msg = cfg
Expand All @@ -50,9 +69,6 @@ pub(crate) async fn create_proposal(
// The previous proposal was finalized, so we can propose a new block.
None => {
// Defensively assume that PayloadManager cannot propose until the previous block is stored.
// if we don't have the previous block, this call will halt until the other replicas timeout.
// This is fine as we can just not propose anything and let our turn end. Eventually, some other
// replica will produce some block with this block number and this function will unblock.
if let Some(prev) = block_number.prev() {
cfg.block_store.wait_until_persisted(ctx, prev).await?;
}
Expand Down
309 changes: 309 additions & 0 deletions node/actors/bft/src/chonky_bft/testonly.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
use crate::testonly::RandomPayload;
use crate::{
chonky_bft::{self, commit, new_view, proposal, timeout, StateMachine},
io::OutputMessage,
Config, PayloadManager,
};
use assert_matches::assert_matches;
use std::sync::Arc;
use zksync_concurrency::ctx;
use zksync_concurrency::sync::prunable_mpsc;
use zksync_consensus_network as network;
use zksync_consensus_network::io::ConsensusReq;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{
testonly::{in_memory, TestMemoryStorage},
BlockStoreRunner,
};
use zksync_consensus_utils::enum_util::Variant;

pub(crate) const MAX_PAYLOAD_SIZE: usize = 1000;

/// `UTHarness` provides various utilities for unit tests.
/// It is designed to simplify the setup and execution of test cases by encapsulating
/// common testing functionality.
///
/// It should be instantiated once for every test case.
#[cfg(test)]
pub(crate) struct UTHarness {
pub(crate) replica: StateMachine,
pub(crate) keys: Vec<validator::SecretKey>,
output_pipe: ctx::channel::UnboundedReceiver<OutputMessage>,
input_pipe: prunable_mpsc::Sender<ConsensusReq>,
}

impl UTHarness {
/// Creates a new `UTHarness` with the specified validator set size.
pub(crate) async fn new(
ctx: &ctx::Ctx,
num_validators: usize,
) -> (UTHarness, BlockStoreRunner) {
Self::new_with_payload_manager(
ctx,
num_validators,
Box::new(RandomPayload(MAX_PAYLOAD_SIZE)),
)
.await
}

/// Creates a new `UTHarness` with minimally-significant validator set size.
pub(crate) async fn new_many(ctx: &ctx::Ctx) -> (UTHarness, BlockStoreRunner) {
let num_validators = 6;
let (util, runner) = UTHarness::new(ctx, num_validators).await;
assert!(util.genesis().validators.max_faulty_weight() > 0);
(util, runner)
}

pub(crate) async fn new_with_payload_manager(
ctx: &ctx::Ctx,
num_validators: usize,
payload_manager: Box<dyn PayloadManager>,
) -> (UTHarness, BlockStoreRunner) {
let rng = &mut ctx.rng();
let setup = validator::testonly::Setup::new(rng, num_validators);
let store = TestMemoryStorage::new(ctx, &setup).await;
let (send, recv) = ctx::channel::unbounded();

let cfg = Arc::new(Config {
secret_key: setup.validator_keys[0].clone(),
block_store: store.blocks.clone(),
replica_store: Box::new(in_memory::ReplicaStore::default()),
payload_manager,
max_payload_size: MAX_PAYLOAD_SIZE,
});
let (replica, input_pipe) = StateMachine::start(ctx, cfg.clone(), send.clone())
.await
.unwrap();
let mut this = UTHarness {
replica,
keys: setup.validator_keys.clone(),
output_pipe: recv,
input_pipe,
};
this.process_replica_timeout_all(ctx, this.new_replica_timeout())
.await;
(this, store.runner)
}

pub(crate) fn owner_key(&self) -> &validator::SecretKey {
&self.replica.config.secret_key
}

pub(crate) fn leader_key(&self) -> validator::SecretKey {
let leader = self.view_leader(self.replica.view_number);
self.keys
.iter()
.find(|key| key.public() == leader)
.unwrap()
.clone()
}

pub(crate) fn view(&self) -> validator::View {
validator::View {
genesis: self.genesis().hash(),
number: self.replica.view_number,
}
}

pub(crate) fn view_leader(&self, view: validator::ViewNumber) -> validator::PublicKey {
self.genesis().view_leader(view)
}

pub(crate) fn set_owner_as_view_leader(&mut self) {
let mut view = self.replica.view_number;
while self.view_leader(view) != self.owner_key().public() {
view = view.next();
}
self.replica.view_number = view;
}

pub(crate) fn genesis(&self) -> &validator::Genesis {
self.replica.config.genesis()
}

pub(crate) async fn new_leader_proposal(&self, ctx: &ctx::Ctx) -> validator::LeaderProposal {
let justification = self.replica.get_justification();
chonky_bft::proposer::create_proposal(ctx, self.replica.config.clone(), justification)
.await
.unwrap()
}

pub(crate) async fn new_replica_commit(&mut self, ctx: &ctx::Ctx) -> validator::ReplicaCommit {
let proposal = self.new_leader_proposal(ctx).await;

self.process_leader_proposal(ctx, self.leader_key().sign_msg(proposal))
.await
.unwrap()
.msg
}

pub(crate) fn new_replica_timeout(&self) -> validator::ReplicaTimeout {
validator::ReplicaTimeout {
view: self.view(),
high_vote: self.replica.high_vote.clone(),
high_qc: self.replica.high_commit_qc.clone(),
}
}

pub(crate) async fn new_replica_new_view(&self) -> validator::ReplicaNewView {
let justification = self.replica.get_justification();
validator::ReplicaNewView { justification }
}

pub(crate) async fn new_commit_qc(
&mut self,
ctx: &ctx::Ctx,
mutate_fn: impl FnOnce(&mut validator::ReplicaCommit),
) -> validator::CommitQC {
let mut msg = self.new_replica_commit(ctx).await;
mutate_fn(&mut msg);
let mut qc = validator::CommitQC::new(msg, self.genesis());
for key in &self.keys {
qc.add(&key.sign_msg(qc.message.clone()), self.genesis())
.unwrap();
}
qc
}

pub(crate) fn new_timeout_qc(
&mut self,
mutate_fn: impl FnOnce(&mut validator::ReplicaTimeout),
) -> validator::TimeoutQC {
let mut msg = self.new_replica_timeout();
mutate_fn(&mut msg);
let mut qc = validator::TimeoutQC::new(msg.view.clone());
for key in &self.keys {
qc.add(&key.sign_msg(msg.clone()), self.genesis()).unwrap();
}
qc
}

pub(crate) async fn process_leader_proposal(
&mut self,
ctx: &ctx::Ctx,
msg: validator::Signed<validator::LeaderProposal>,
) -> Result<validator::Signed<validator::ReplicaCommit>, proposal::Error> {
self.replica.on_proposal(ctx, msg).await?;
Ok(self.try_recv().unwrap())
}

pub(crate) async fn process_replica_commit(
&mut self,
ctx: &ctx::Ctx,
msg: validator::Signed<validator::ReplicaCommit>,
) -> Result<Option<validator::Signed<validator::ReplicaNewView>>, commit::Error> {
self.replica.on_commit(ctx, msg).await?;
Ok(self.try_recv())
}

pub(crate) async fn process_replica_timeout(
&mut self,
ctx: &ctx::Ctx,
msg: validator::Signed<validator::ReplicaTimeout>,
) -> Result<Option<validator::Signed<validator::ReplicaNewView>>, timeout::Error> {
self.replica.on_timeout(ctx, msg).await?;
Ok(self.try_recv())
}

pub(crate) async fn process_replica_new_view(
&mut self,
ctx: &ctx::Ctx,
msg: validator::Signed<validator::ReplicaNewView>,
) -> Result<Option<validator::Signed<validator::ReplicaNewView>>, new_view::Error> {
self.replica.on_new_view(ctx, msg).await?;
Ok(self.try_recv())
}

pub(crate) async fn process_replica_commit_all(
&mut self,
ctx: &ctx::Ctx,
msg: validator::ReplicaCommit,
) -> validator::Signed<validator::ReplicaNewView> {
let mut threshold_reached = false;
let mut cur_weight = 0;

for key in self.keys.iter() {
let res = self.replica.on_commit(ctx, key.sign_msg(msg.clone())).await;
let val_index = self.genesis().validators.index(&key.public()).unwrap();

cur_weight += self.genesis().validators.get(val_index).unwrap().weight;

if !threshold_reached {
res.unwrap();
if cur_weight >= self.genesis().validators.quorum_threshold() {
threshold_reached = true;
}
} else {
assert_matches!(res, Err(commit::Error::Old { .. }));
}
}

self.try_recv().unwrap()
}

pub(crate) async fn process_replica_timeout_all(
&mut self,
ctx: &ctx::Ctx,
msg: validator::ReplicaTimeout,
) -> validator::Signed<validator::ReplicaNewView> {
let mut threshold_reached = false;
let mut cur_weight = 0;

for key in self.keys.iter() {
let res = self
.replica
.on_timeout(ctx, key.sign_msg(msg.clone()))
.await;
let val_index = self.genesis().validators.index(&key.public()).unwrap();

cur_weight += self.genesis().validators.get(val_index).unwrap().weight;

if !threshold_reached {
res.unwrap();
if cur_weight >= self.genesis().validators.quorum_threshold() {
threshold_reached = true;
}
} else {
assert_matches!(res, Err(timeout::Error::Old { .. }));
}
}

self.try_recv().unwrap()
}

/// Produces a block, by executing the full view.
pub(crate) async fn produce_block(&mut self, ctx: &ctx::Ctx) {
let replica_commit = self.new_replica_commit(ctx).await;
self.process_replica_commit_all(ctx, replica_commit).await;
}

/// Triggers replica timeout, processes the new validator::ReplicaTimeout
/// to start a new view, then executes the whole new view to make sure
/// that the consensus recovers after a timeout.
pub(crate) async fn produce_block_after_timeout(&mut self, ctx: &ctx::Ctx) {
let cur_view = self.replica.view_number;

self.replica.start_timeout(ctx).await.unwrap();
let replica_timeout = self.try_recv().unwrap().msg;
self.process_replica_timeout_all(ctx, replica_timeout).await;

assert_eq!(self.replica.view_number, cur_view.next());

self.produce_block(ctx).await;
}

pub(crate) fn send(&self, msg: validator::Signed<validator::ConsensusMsg>) {
self.input_pipe.send(ConsensusReq {
msg,
ack: zksync_concurrency::oneshot::channel().0,
});
}

fn try_recv<V: Variant<validator::Msg>>(&mut self) -> Option<validator::Signed<V>> {
self.output_pipe.try_recv().map(|message| match message {
OutputMessage::Network(network::io::ConsensusInputMessage { message, .. }) => {
message.cast().unwrap()
}
})
}
}
Loading

0 comments on commit 1daec32

Please sign in to comment.