Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Rework consensus storage crate #10

Merged
merged 17 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/Cranky.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ warn = [
# cargo group
"clippy::wildcard_dependencies",
]

allow = [
# Produces too many false positives.
"clippy::redundant_locals",
]
5 changes: 1 addition & 4 deletions node/actors/consensus/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
collections::{BTreeMap, HashMap},
unreachable,
};
use tracing::{instrument, warn};
use tracing::instrument;

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
Expand Down Expand Up @@ -72,8 +72,5 @@ impl StateMachine {
};
metrics::METRICS.leader_processing_latency[&label.with_result(&result)]
.observe_latency(ctx.now() - now);
if let Err(e) = result {
warn!("{}", e);
}
}
}
2 changes: 1 addition & 1 deletion node/actors/consensus/src/leader/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn replica_commit() {

let keys: Vec<_> = (0..1).map(|_| rng.gen()).collect();
let (genesis, val_set) = testonly::make_genesis(&keys, vec![]);
let (mut consensus, _) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis);
let (mut consensus, _) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis).await;

let proposal_block_hash = rng.gen();

Expand Down
28 changes: 17 additions & 11 deletions node/actors/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
//! - Blog posts explaining [safety](https://seafooler.com/2022/01/24/understanding-safety-hotstuff/) and [responsiveness](https://seafooler.com/2022/04/02/understanding-responsiveness-hotstuff/)

use crate::io::{InputMessage, OutputMessage};
use anyhow::Context as _;
use concurrency::ctx;
use inner::ConsensusInner;
use roles::validator;
use std::sync::Arc;
use storage::Storage;
use storage::ReplicaStateStore;
use tracing::{info, instrument};
use utils::pipe::ActorPipe;

Expand Down Expand Up @@ -48,22 +49,22 @@ pub struct Consensus {
impl Consensus {
/// Creates a new Consensus struct.
#[instrument(level = "trace", ret)]
pub fn new(
pub async fn new(
ctx: &ctx::Ctx,
pipe: ActorPipe<InputMessage, OutputMessage>,
secret_key: validator::SecretKey,
validator_set: validator::ValidatorSet,
storage: Arc<Storage>,
) -> Self {
Consensus {
storage: Arc<dyn ReplicaStateStore>,
) -> anyhow::Result<Self> {
Ok(Consensus {
inner: ConsensusInner {
pipe,
secret_key,
validator_set,
},
replica: replica::StateMachine::new(storage),
replica: replica::StateMachine::new(ctx, storage).await?,
leader: leader::StateMachine::new(ctx),
}
})
}

/// Starts the Consensus actor. It will start running, processing incoming messages and
Expand All @@ -76,7 +77,9 @@ impl Consensus {
);

// We need to start the replica before processing inputs.
self.replica.start(ctx, &self.inner);
self.replica
.start(ctx, &self.inner)
.context("replica.start()")?;

// This is the infinite loop where the consensus actually runs. The validator waits for either
// a message from the network or for a timeout, and processes each accordingly.
Expand All @@ -99,18 +102,21 @@ impl Consensus {
match &req.msg.msg {
validator::ConsensusMsg::ReplicaPrepare(_)
| validator::ConsensusMsg::ReplicaCommit(_) => {
self.leader.process_input(ctx, &self.inner, req.msg)
self.leader.process_input(ctx, &self.inner, req.msg);
}
validator::ConsensusMsg::LeaderPrepare(_)
| validator::ConsensusMsg::LeaderCommit(_) => {
self.replica.process_input(ctx, &self.inner, Some(req.msg))
self.replica
.process_input(ctx, &self.inner, Some(req.msg))?;
}
}
// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
}
None => self.replica.process_input(ctx, &self.inner, None),
None => {
self.replica.process_input(ctx, &self.inner, None)?;
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/actors/consensus/src/replica/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ pub(crate) enum Error {
LeaderPrepareReproposalWhenFinalized,
#[error("received leader prepare message with block re-proposal of invalid block")]
LeaderPrepareReproposalInvalidBlock,
#[error("failed saving replica state to DB: {_0}")]
ReplicaStateSave(#[source] anyhow::Error),
}
5 changes: 2 additions & 3 deletions node/actors/consensus/src/replica/leader_commit.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use super::StateMachine;
use crate::{inner::ConsensusInner, replica::error::Error};

use concurrency::ctx;
use roles::validator;
use tracing::instrument;

impl StateMachine {
/// Processes a leader commit message. We can approve this leader message even if we
/// don't have the block proposal stored. It is enough to see the justification.
#[instrument(level = "trace", ret)]
#[instrument(level = "trace", err)]
pub(crate) fn process_leader_commit(
&mut self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -66,7 +65,7 @@ impl StateMachine {

// Start a new view. But first we skip to the view of this message.
self.view = view;
self.start_new_view(ctx, consensus);
self.start_new_view(ctx, consensus)?;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion node/actors/consensus/src/replica/leader_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl StateMachine {
}

// Backup our state.
self.backup_state();
self.backup_state(ctx).map_err(Error::ReplicaStateSave)?;

// Send the replica message to the leader.
let output_message = ConsensusInputMessage {
Expand Down
19 changes: 12 additions & 7 deletions node/actors/consensus/src/replica/new_view.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use super::StateMachine;
use super::{error::Error, StateMachine};
use crate::ConsensusInner;
use concurrency::ctx;
use network::io::{ConsensusInputMessage, Target};
use roles::validator;
use tracing::{info, instrument};
use tracing::instrument;

impl StateMachine {
/// This method is used whenever we start a new view.
#[instrument(level = "trace", ret)]
pub(crate) fn start_new_view(&mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner) {
info!("Starting view {}", self.view.next().0);
/// This blocking method is used whenever we start a new view.
#[instrument(level = "trace", err)]
pub(crate) fn start_new_view(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
) -> Result<(), Error> {
tracing::info!("Starting view {}", self.view.next().0);

// Update the state machine.
let next_view = self.view.next();
Expand All @@ -22,7 +26,7 @@ impl StateMachine {
.retain(|k, _| k > &self.high_qc.message.proposal_block_number);

// Backup our state.
self.backup_state();
self.backup_state(ctx).map_err(Error::ReplicaStateSave)?;

// Send the replica message to the next leader.
let output_message = ConsensusInputMessage {
Expand All @@ -41,5 +45,6 @@ impl StateMachine {

// Reset the timer.
self.reset_timer(ctx);
Ok(())
}
}
64 changes: 43 additions & 21 deletions node/actors/consensus/src/replica/state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{metrics, ConsensusInner};
use concurrency::{ctx, metrics::LatencyHistogramExt as _, time};
use crate::{metrics, replica::error::Error, ConsensusInner};
use concurrency::{ctx, metrics::LatencyHistogramExt as _, scope, time};
use roles::validator;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use storage::Storage;
use tracing::{instrument, warn};
use storage::{ReplicaStateStore, StorageError};
use tracing::instrument;

/// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible
/// for validating and voting on blocks. When participating in consensus we are always a replica.
Expand All @@ -26,14 +26,17 @@ pub(crate) struct StateMachine {
/// The deadline to receive an input message.
pub(crate) timeout_deadline: time::Deadline,
/// A reference to the storage module. We use it to backup the replica state.
pub(crate) storage: Arc<Storage>,
pub(crate) storage: Arc<dyn ReplicaStateStore>,
}

impl StateMachine {
/// Creates a new StateMachine struct. We try to recover a past state from the storage module,
/// otherwise we initialize the state machine with whatever head block we have.
pub(crate) fn new(storage: Arc<Storage>) -> Self {
match storage.get_replica_state() {
pub(crate) async fn new(
ctx: &ctx::Ctx,
storage: Arc<dyn ReplicaStateStore>,
) -> anyhow::Result<Self> {
Ok(match storage.replica_state(ctx).await? {
Some(backup) => Self {
view: backup.view,
phase: backup.phase,
Expand All @@ -44,8 +47,7 @@ impl StateMachine {
storage,
},
None => {
let head = storage.get_head_block();

let head = storage.head_block(ctx).await?;
Self {
view: head.justification.message.view,
phase: validator::Phase::Prepare,
Expand All @@ -56,18 +58,23 @@ impl StateMachine {
storage,
}
}
}
})
}

/// Starts the state machine. The replica state needs to be initialized before
/// we are able to process inputs. If we are in the genesis block, then we start a new view,
/// this will kick start the consensus algorithm. Otherwise, we just start the timer.
#[instrument(level = "trace", ret)]
pub(crate) fn start(&mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner) {
pub(crate) fn start(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
) -> Result<(), Error> {
if self.view == validator::ViewNumber(0) {
self.start_new_view(ctx, consensus)
} else {
self.reset_timer(ctx)
self.reset_timer(ctx);
Ok(())
}
}

Expand All @@ -80,12 +87,12 @@ impl StateMachine {
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
input: Option<validator::Signed<validator::ConsensusMsg>>,
) {
) -> anyhow::Result<()> {
let Some(signed_msg) = input else {
warn!("We timed out before receiving a message.");
tracing::warn!("We timed out before receiving a message.");
// Start new view.
self.start_new_view(ctx, consensus);
return;
self.start_new_view(ctx, consensus)?;
return Ok(());
};

let now = ctx.now();
Expand All @@ -102,14 +109,19 @@ impl StateMachine {
};
metrics::METRICS.replica_processing_latency[&label.with_result(&result)]
.observe_latency(ctx.now() - now);
// All errors from processing inputs are recoverable, so we just log them.
if let Err(e) = result {
warn!("{}", e);
match result {
Ok(()) => Ok(()),
Err(err @ Error::ReplicaStateSave(_)) => Err(err.into()),
Err(err) => {
// Other errors from processing inputs are recoverable, so we just log them.
tracing::warn!("{err}");
Ok(())
}
}
}

/// Backups the replica state to disk.
pub(crate) fn backup_state(&self) {
pub(crate) fn backup_state(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let backup = storage::ReplicaState {
view: self.view,
phase: self.phase,
Expand All @@ -118,6 +130,16 @@ impl StateMachine {
block_proposal_cache: self.block_proposal_cache.clone(),
};

self.storage.put_replica_state(&backup);
let store_result = scope::run_blocking!(ctx, |ctx, s| {
let backup_future = self.storage.put_replica_state(ctx, &backup);
s.spawn(backup_future).join(ctx).block()?;
Ok(())
});
match store_result {
Ok(()) => { /* Everything went fine */ }
Err(StorageError::Canceled(_)) => tracing::trace!("Storing replica state was canceled"),
Err(StorageError::Database(err)) => return Err(err),
}
Ok(())
}
}
19 changes: 16 additions & 3 deletions node/actors/consensus/src/replica/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::testonly;
use concurrency::{ctx, time};
use concurrency::{ctx, scope, time};
use network::io::{ConsensusInputMessage, Target};
use rand::Rng;
use roles::validator::{self, ViewNumber};
Expand All @@ -11,13 +11,26 @@ async fn start_new_view_not_leader() {

let keys: Vec<_> = (0..4).map(|_| rng.gen()).collect();
let (genesis, val_set) = testonly::make_genesis(&keys, vec![]);
let (mut consensus, mut pipe) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis);
let (mut consensus, mut pipe) =
testonly::make_consensus(ctx, &keys[0], &val_set, &genesis).await;
// TODO: this test assumes a specific implementation of the leader schedule.
// Make it leader-schedule agnostic (use epoch to select a specific view).
consensus.replica.view = ViewNumber(1);
consensus.replica.high_qc = rng.gen();
consensus.replica.high_qc.message.view = ViewNumber(0);
consensus.replica.start_new_view(ctx, &consensus.inner);

scope::run!(ctx, |ctx, s| {
s.spawn_blocking(|| {
consensus
.replica
.start_new_view(ctx, &consensus.inner)
.unwrap();
Ok(())
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we might want to introduce some syntax sugar for this construction.

.join(ctx)
})
.await
.unwrap();

let test_new_view_msg = ConsensusInputMessage {
message: consensus
Expand Down
Loading
Loading