Skip to content

Commit

Permalink
More unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoffranca committed Oct 30, 2024
1 parent 1daec32 commit 7d53516
Show file tree
Hide file tree
Showing 12 changed files with 450 additions and 285 deletions.
13 changes: 6 additions & 7 deletions node/actors/bft/src/chonky_bft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub(crate) mod proposer;
pub(crate) mod timeout;

#[cfg(test)]
mod testonly;
pub(crate) mod testonly;
#[cfg(test)]
mod tests;

Expand All @@ -35,9 +35,9 @@ pub(crate) struct StateMachine {
pub(super) outbound_pipe: OutputSender,
/// Pipe through which replica receives network requests.
pub(crate) inbound_pipe: sync::prunable_mpsc::Receiver<ConsensusReq>,
/// The sender part of the justification watch. This is used to set the justification
/// and notify the proposer loop.
pub(crate) justification_watch: sync::watch::Sender<Option<validator::ProposalJustification>>,
/// The sender part of the proposer watch channel. This is used to notify the proposer loop
/// and send the neeeded justification.

Check warning on line 39 in node/actors/bft/src/chonky_bft/mod.rs

View workflow job for this annotation

GitHub Actions / typos

"neeeded" should be "needed".
pub(crate) proposer_pipe: sync::watch::Sender<Option<validator::ProposalJustification>>,

/// The current view number.
pub(crate) view_number: validator::ViewNumber,
Expand Down Expand Up @@ -83,6 +83,7 @@ impl StateMachine {
ctx: &ctx::Ctx,
config: Arc<Config>,
outbound_pipe: OutputSender,
proposer_pipe: sync::watch::Sender<Option<validator::ProposalJustification>>,
) -> ctx::Result<(Self, sync::prunable_mpsc::Sender<ConsensusReq>)> {
let backup = config.replica_store.state(ctx).await?;

Expand All @@ -99,12 +100,11 @@ impl StateMachine {
StateMachine::inbound_selection_function,
);

let (justification_sender, _) = sync::watch::channel(None);

let this = Self {
config,
outbound_pipe,
inbound_pipe: recv,
proposer_pipe,
view_number: backup.view,
phase: backup.phase,
high_vote: backup.high_vote,
Expand All @@ -115,7 +115,6 @@ impl StateMachine {
commit_qcs_cache: BTreeMap::new(),
timeout_views_cache: BTreeMap::new(),
timeout_qcs_cache: BTreeMap::new(),
justification_watch: justification_sender,
timeout_deadline: time::Deadline::Finite(ctx.now() + Self::TIMEOUT_DURATION),
phase_start: ctx.now(),
};
Expand Down
4 changes: 3 additions & 1 deletion node/actors/bft/src/chonky_bft/new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ impl StateMachine {
// Update the state machine.
self.view_number = view;
self.phase = validator::Phase::Prepare;
// TODO: Update the proposer channel.
self.proposer_pipe
.send(Some(self.get_justification()))
.expect("justification_watch.send() failed");

// Clear the block proposal cache.
if let Some(qc) = self.high_commit_qc.as_ref() {
Expand Down
4 changes: 2 additions & 2 deletions node/actors/bft/src/chonky_bft/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub(crate) const PROPOSAL_CREATION_TIMEOUT: time::Duration = time::Duration::mil
pub(crate) async fn run_proposer(
ctx: &ctx::Ctx,
cfg: Arc<Config>,
pipe: OutputSender,
outbound_pipe: OutputSender,
mut justification_watch: sync::watch::Receiver<Option<validator::ProposalJustification>>,
) -> ctx::Result<()> {
loop {
Expand Down Expand Up @@ -49,7 +49,7 @@ pub(crate) async fn run_proposer(
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderProposal(proposal));

pipe.send(ConsensusInputMessage { message: msg }.into());
outbound_pipe.send(ConsensusInputMessage { message: msg }.into());
}
}

Expand Down
38 changes: 17 additions & 21 deletions node/actors/bft/src/chonky_bft/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
};
use assert_matches::assert_matches;
use std::sync::Arc;
use zksync_concurrency::ctx;
use zksync_concurrency::sync::prunable_mpsc;
use zksync_concurrency::{ctx, sync};
use zksync_consensus_network as network;
use zksync_consensus_network::io::ConsensusReq;
use zksync_consensus_roles::validator;
Expand All @@ -28,8 +28,9 @@ pub(crate) const MAX_PAYLOAD_SIZE: usize = 1000;
pub(crate) struct UTHarness {
pub(crate) replica: StateMachine,
pub(crate) keys: Vec<validator::SecretKey>,
output_pipe: ctx::channel::UnboundedReceiver<OutputMessage>,
input_pipe: prunable_mpsc::Sender<ConsensusReq>,
pub(crate) outbound_pipe: ctx::channel::UnboundedReceiver<OutputMessage>,
pub(crate) inbound_pipe: prunable_mpsc::Sender<ConsensusReq>,
pub(crate) proposer_pipe: sync::watch::Receiver<Option<validator::ProposalJustification>>,
}

impl UTHarness {
Expand Down Expand Up @@ -63,6 +64,7 @@ impl UTHarness {
let setup = validator::testonly::Setup::new(rng, num_validators);
let store = TestMemoryStorage::new(ctx, &setup).await;
let (send, recv) = ctx::channel::unbounded();
let (proposer_sender, proposer_receiver) = sync::watch::channel(None);

let cfg = Arc::new(Config {
secret_key: setup.validator_keys[0].clone(),
Expand All @@ -71,14 +73,16 @@ impl UTHarness {
payload_manager,
max_payload_size: MAX_PAYLOAD_SIZE,
});
let (replica, input_pipe) = StateMachine::start(ctx, cfg.clone(), send.clone())
.await
.unwrap();
let (replica, input_pipe) =
StateMachine::start(ctx, cfg.clone(), send.clone(), proposer_sender)
.await
.unwrap();
let mut this = UTHarness {
replica,
keys: setup.validator_keys.clone(),
output_pipe: recv,
input_pipe,
outbound_pipe: recv,
inbound_pipe: input_pipe,
proposer_pipe: proposer_receiver,
};
this.process_replica_timeout_all(ctx, this.new_replica_timeout())
.await;
Expand Down Expand Up @@ -109,14 +113,6 @@ impl UTHarness {
self.genesis().view_leader(view)
}

pub(crate) fn set_owner_as_view_leader(&mut self) {
let mut view = self.replica.view_number;
while self.view_leader(view) != self.owner_key().public() {
view = view.next();
}
self.replica.view_number = view;
}

pub(crate) fn genesis(&self) -> &validator::Genesis {
self.replica.config.genesis()
}
Expand Down Expand Up @@ -157,14 +153,14 @@ impl UTHarness {
) -> validator::CommitQC {
let mut msg = self.new_replica_commit(ctx).await;
mutate_fn(&mut msg);
let mut qc = validator::CommitQC::new(msg, self.genesis());
let mut qc = validator::CommitQC::new(msg.clone(), self.genesis());
for key in &self.keys {
qc.add(&key.sign_msg(qc.message.clone()), self.genesis())
.unwrap();
qc.add(&key.sign_msg(msg.clone()), self.genesis()).unwrap();
}
qc
}

#[allow(dead_code)]
pub(crate) fn new_timeout_qc(
&mut self,
mutate_fn: impl FnOnce(&mut validator::ReplicaTimeout),
Expand Down Expand Up @@ -293,14 +289,14 @@ impl UTHarness {
}

pub(crate) fn send(&self, msg: validator::Signed<validator::ConsensusMsg>) {
self.input_pipe.send(ConsensusReq {
self.inbound_pipe.send(ConsensusReq {
msg,
ack: zksync_concurrency::oneshot::channel().0,
});
}

fn try_recv<V: Variant<validator::Msg>>(&mut self) -> Option<validator::Signed<V>> {
self.output_pipe.try_recv().map(|message| match message {
self.outbound_pipe.try_recv().map(|message| match message {
OutputMessage::Network(network::io::ConsensusInputMessage { message, .. }) => {
message.cast().unwrap()
}
Expand Down
8 changes: 4 additions & 4 deletions node/actors/bft/src/chonky_bft/tests/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ async fn replica_commit_old() {

let mut replica_commit = util.new_replica_commit(ctx).await;
replica_commit.view.number = validator::ViewNumber(util.replica.view_number.0 - 1);
let replica_commit = util.owner_key().sign_msg(replica_commit);
let res = util.process_replica_commit(ctx, replica_commit).await;
let res = util
.process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit))
.await;

assert_matches!(
res,
Expand Down Expand Up @@ -160,13 +161,12 @@ async fn commit_invalid_sig() {
async fn commit_invalid_message() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new(ctx, 1).await;
s.spawn_bg(runner.run(ctx));

let mut replica_commit = util.new_replica_commit(ctx).await;
replica_commit.view.genesis = rng.gen();
replica_commit.view.genesis = ctx.rng().gen();

let res = util
.process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit))
Expand Down
64 changes: 52 additions & 12 deletions node/actors/bft/src/chonky_bft/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use zksync_concurrency::{ctx, scope};
use zksync_consensus_roles::validator;

mod commit;
mod new_view;
mod proposal;
mod proposer;
mod timeout;

/// Sanity check of the happy path.
Expand Down Expand Up @@ -42,25 +44,16 @@ async fn block_production_timeout() {

/// Sanity check of block production with reproposal.
#[tokio::test]
async fn reproposal_block_production() {
async fn block_production_timeout_reproposal() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new_many(ctx).await;
s.spawn_bg(runner.run(ctx));

let proposal = util.new_leader_proposal(ctx).await;
let replica_commit = util
.process_leader_proposal(ctx, util.leader_key().sign_msg(proposal.clone()))
.await
.unwrap()
.msg;
let replica_commit = util.new_replica_commit(ctx).await;
let mut timeout = util.new_replica_timeout();

let mut timeout = validator::ReplicaTimeout {
view: replica_commit.view.clone(),
high_vote: Some(replica_commit.clone()),
high_qc: util.replica.high_commit_qc.clone(),
};
for i in 0..util.genesis().validators.subquorum_threshold() as usize {
util.process_replica_timeout(ctx, util.keys[i].sign_msg(timeout.clone()))
.await
Expand All @@ -85,3 +78,50 @@ async fn reproposal_block_production() {
.await
.unwrap();
}

/// Testing liveness after the network becomes idle with replica in commit phase.
#[tokio::test]
async fn block_production_timeout_in_commit() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new_many(ctx).await;
s.spawn_bg(runner.run(ctx));

util.new_replica_commit(ctx).await;

// Replica is in `Phase::Commit`, but should still accept messages from newer views.
assert_eq!(util.replica.phase, validator::Phase::Commit);
util.produce_block_after_timeout(ctx).await;

Ok(())
})
.await
.unwrap();
}

/// Testing liveness after the network becomes idle with replica having some cached commit messages for the current view.
#[tokio::test]
async fn block_production_timeout_some_commits() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new_many(ctx).await;
s.spawn_bg(runner.run(ctx));

let replica_commit = util.new_replica_commit(ctx).await;
assert!(util
.process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit))
.await
.unwrap()
.is_none());

// Replica is in `Phase::Commit`, but should still accept prepares from newer views.
assert_eq!(util.replica.phase, validator::Phase::Commit);
util.produce_block_after_timeout(ctx).await;

Ok(())
})
.await
.unwrap();
}
Loading

0 comments on commit 7d53516

Please sign in to comment.