Skip to content

Commit

Permalink
feat: Rework consensus storage crate (#10)
Browse files Browse the repository at this point in the history
# What ❔

- Extracts storage API into trait(s) and make them async.
- Introduces buffered storage (i.e., one that schedules sequential block
execution).

## Why ❔

Part of preparations for the integration with the server codebase.
  • Loading branch information
slowli authored Oct 24, 2023
1 parent 75be6d5 commit fe38f3d
Show file tree
Hide file tree
Showing 29 changed files with 910 additions and 530 deletions.
5 changes: 5 additions & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/Cranky.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ warn = [
# cargo group
"clippy::wildcard_dependencies",
]

allow = [
# Produces too many false positives.
"clippy::redundant_locals",
]
5 changes: 1 addition & 4 deletions node/actors/consensus/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
collections::{BTreeMap, HashMap},
unreachable,
};
use tracing::{instrument, warn};
use tracing::instrument;

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
Expand Down Expand Up @@ -72,8 +72,5 @@ impl StateMachine {
};
metrics::METRICS.leader_processing_latency[&label.with_result(&result)]
.observe_latency(ctx.now() - now);
if let Err(e) = result {
warn!("{}", e);
}
}
}
2 changes: 1 addition & 1 deletion node/actors/consensus/src/leader/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn replica_commit() {

let keys: Vec<_> = (0..1).map(|_| rng.gen()).collect();
let (genesis, val_set) = testonly::make_genesis(&keys, vec![]);
let (mut consensus, _) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis);
let (mut consensus, _) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis).await;

let proposal_block_hash = rng.gen();

Expand Down
28 changes: 17 additions & 11 deletions node/actors/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
//! - Blog posts explaining [safety](https://seafooler.com/2022/01/24/understanding-safety-hotstuff/) and [responsiveness](https://seafooler.com/2022/04/02/understanding-responsiveness-hotstuff/)

use crate::io::{InputMessage, OutputMessage};
use anyhow::Context as _;
use concurrency::ctx;
use inner::ConsensusInner;
use roles::validator;
use std::sync::Arc;
use storage::Storage;
use storage::ReplicaStateStore;
use tracing::{info, instrument};
use utils::pipe::ActorPipe;

Expand Down Expand Up @@ -48,22 +49,22 @@ pub struct Consensus {
impl Consensus {
/// Creates a new Consensus struct.
#[instrument(level = "trace", ret)]
pub fn new(
pub async fn new(
ctx: &ctx::Ctx,
pipe: ActorPipe<InputMessage, OutputMessage>,
secret_key: validator::SecretKey,
validator_set: validator::ValidatorSet,
storage: Arc<Storage>,
) -> Self {
Consensus {
storage: Arc<dyn ReplicaStateStore>,
) -> anyhow::Result<Self> {
Ok(Consensus {
inner: ConsensusInner {
pipe,
secret_key,
validator_set,
},
replica: replica::StateMachine::new(storage),
replica: replica::StateMachine::new(ctx, storage).await?,
leader: leader::StateMachine::new(ctx),
}
})
}

/// Starts the Consensus actor. It will start running, processing incoming messages and
Expand All @@ -76,7 +77,9 @@ impl Consensus {
);

// We need to start the replica before processing inputs.
self.replica.start(ctx, &self.inner);
self.replica
.start(ctx, &self.inner)
.context("replica.start()")?;

// This is the infinite loop where the consensus actually runs. The validator waits for either
// a message from the network or for a timeout, and processes each accordingly.
Expand All @@ -99,18 +102,21 @@ impl Consensus {
match &req.msg.msg {
validator::ConsensusMsg::ReplicaPrepare(_)
| validator::ConsensusMsg::ReplicaCommit(_) => {
self.leader.process_input(ctx, &self.inner, req.msg)
self.leader.process_input(ctx, &self.inner, req.msg);
}
validator::ConsensusMsg::LeaderPrepare(_)
| validator::ConsensusMsg::LeaderCommit(_) => {
self.replica.process_input(ctx, &self.inner, Some(req.msg))
self.replica
.process_input(ctx, &self.inner, Some(req.msg))?;
}
}
// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
}
None => self.replica.process_input(ctx, &self.inner, None),
None => {
self.replica.process_input(ctx, &self.inner, None)?;
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/actors/consensus/src/replica/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ pub(crate) enum Error {
LeaderPrepareReproposalWhenFinalized,
#[error("received leader prepare message with block re-proposal of invalid block")]
LeaderPrepareReproposalInvalidBlock,
#[error("failed saving replica state to DB: {_0}")]
ReplicaStateSave(#[source] anyhow::Error),
}
5 changes: 2 additions & 3 deletions node/actors/consensus/src/replica/leader_commit.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use super::StateMachine;
use crate::{inner::ConsensusInner, replica::error::Error};

use concurrency::ctx;
use roles::validator;
use tracing::instrument;

impl StateMachine {
/// Processes a leader commit message. We can approve this leader message even if we
/// don't have the block proposal stored. It is enough to see the justification.
#[instrument(level = "trace", ret)]
#[instrument(level = "trace", err)]
pub(crate) fn process_leader_commit(
&mut self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -66,7 +65,7 @@ impl StateMachine {

// Start a new view. But first we skip to the view of this message.
self.view = view;
self.start_new_view(ctx, consensus);
self.start_new_view(ctx, consensus)?;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion node/actors/consensus/src/replica/leader_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl StateMachine {
}

// Backup our state.
self.backup_state();
self.backup_state(ctx).map_err(Error::ReplicaStateSave)?;

// Send the replica message to the leader.
let output_message = ConsensusInputMessage {
Expand Down
19 changes: 12 additions & 7 deletions node/actors/consensus/src/replica/new_view.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use super::StateMachine;
use super::{error::Error, StateMachine};
use crate::ConsensusInner;
use concurrency::ctx;
use network::io::{ConsensusInputMessage, Target};
use roles::validator;
use tracing::{info, instrument};
use tracing::instrument;

impl StateMachine {
/// This method is used whenever we start a new view.
#[instrument(level = "trace", ret)]
pub(crate) fn start_new_view(&mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner) {
info!("Starting view {}", self.view.next().0);
/// This blocking method is used whenever we start a new view.
#[instrument(level = "trace", err)]
pub(crate) fn start_new_view(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
) -> Result<(), Error> {
tracing::info!("Starting view {}", self.view.next().0);

// Update the state machine.
let next_view = self.view.next();
Expand All @@ -22,7 +26,7 @@ impl StateMachine {
.retain(|k, _| k > &self.high_qc.message.proposal_block_number);

// Backup our state.
self.backup_state();
self.backup_state(ctx).map_err(Error::ReplicaStateSave)?;

// Send the replica message to the next leader.
let output_message = ConsensusInputMessage {
Expand All @@ -41,5 +45,6 @@ impl StateMachine {

// Reset the timer.
self.reset_timer(ctx);
Ok(())
}
}
64 changes: 43 additions & 21 deletions node/actors/consensus/src/replica/state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{metrics, ConsensusInner};
use concurrency::{ctx, metrics::LatencyHistogramExt as _, time};
use crate::{metrics, replica::error::Error, ConsensusInner};
use concurrency::{ctx, metrics::LatencyHistogramExt as _, scope, time};
use roles::validator;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use storage::Storage;
use tracing::{instrument, warn};
use storage::{ReplicaStateStore, StorageError};
use tracing::instrument;

/// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible
/// for validating and voting on blocks. When participating in consensus we are always a replica.
Expand All @@ -26,14 +26,17 @@ pub(crate) struct StateMachine {
/// The deadline to receive an input message.
pub(crate) timeout_deadline: time::Deadline,
/// A reference to the storage module. We use it to backup the replica state.
pub(crate) storage: Arc<Storage>,
pub(crate) storage: Arc<dyn ReplicaStateStore>,
}

impl StateMachine {
/// Creates a new StateMachine struct. We try to recover a past state from the storage module,
/// otherwise we initialize the state machine with whatever head block we have.
pub(crate) fn new(storage: Arc<Storage>) -> Self {
match storage.get_replica_state() {
pub(crate) async fn new(
ctx: &ctx::Ctx,
storage: Arc<dyn ReplicaStateStore>,
) -> anyhow::Result<Self> {
Ok(match storage.replica_state(ctx).await? {
Some(backup) => Self {
view: backup.view,
phase: backup.phase,
Expand All @@ -44,8 +47,7 @@ impl StateMachine {
storage,
},
None => {
let head = storage.get_head_block();

let head = storage.head_block(ctx).await?;
Self {
view: head.justification.message.view,
phase: validator::Phase::Prepare,
Expand All @@ -56,18 +58,23 @@ impl StateMachine {
storage,
}
}
}
})
}

/// Starts the state machine. The replica state needs to be initialized before
/// we are able to process inputs. If we are in the genesis block, then we start a new view,
/// this will kick start the consensus algorithm. Otherwise, we just start the timer.
#[instrument(level = "trace", ret)]
pub(crate) fn start(&mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner) {
pub(crate) fn start(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
) -> Result<(), Error> {
if self.view == validator::ViewNumber(0) {
self.start_new_view(ctx, consensus)
} else {
self.reset_timer(ctx)
self.reset_timer(ctx);
Ok(())
}
}

Expand All @@ -80,12 +87,12 @@ impl StateMachine {
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
input: Option<validator::Signed<validator::ConsensusMsg>>,
) {
) -> anyhow::Result<()> {
let Some(signed_msg) = input else {
warn!("We timed out before receiving a message.");
tracing::warn!("We timed out before receiving a message.");
// Start new view.
self.start_new_view(ctx, consensus);
return;
self.start_new_view(ctx, consensus)?;
return Ok(());
};

let now = ctx.now();
Expand All @@ -102,14 +109,19 @@ impl StateMachine {
};
metrics::METRICS.replica_processing_latency[&label.with_result(&result)]
.observe_latency(ctx.now() - now);
// All errors from processing inputs are recoverable, so we just log them.
if let Err(e) = result {
warn!("{}", e);
match result {
Ok(()) => Ok(()),
Err(err @ Error::ReplicaStateSave(_)) => Err(err.into()),
Err(err) => {
// Other errors from processing inputs are recoverable, so we just log them.
tracing::warn!("{err}");
Ok(())
}
}
}

/// Backups the replica state to disk.
pub(crate) fn backup_state(&self) {
pub(crate) fn backup_state(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let backup = storage::ReplicaState {
view: self.view,
phase: self.phase,
Expand All @@ -118,6 +130,16 @@ impl StateMachine {
block_proposal_cache: self.block_proposal_cache.clone(),
};

self.storage.put_replica_state(&backup);
let store_result = scope::run_blocking!(ctx, |ctx, s| {
let backup_future = self.storage.put_replica_state(ctx, &backup);
s.spawn(backup_future).join(ctx).block()?;
Ok(())
});
match store_result {
Ok(()) => { /* Everything went fine */ }
Err(StorageError::Canceled(_)) => tracing::trace!("Storing replica state was canceled"),
Err(StorageError::Database(err)) => return Err(err),
}
Ok(())
}
}
19 changes: 16 additions & 3 deletions node/actors/consensus/src/replica/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::testonly;
use concurrency::{ctx, time};
use concurrency::{ctx, scope, time};
use network::io::{ConsensusInputMessage, Target};
use rand::Rng;
use roles::validator::{self, ViewNumber};
Expand All @@ -11,13 +11,26 @@ async fn start_new_view_not_leader() {

let keys: Vec<_> = (0..4).map(|_| rng.gen()).collect();
let (genesis, val_set) = testonly::make_genesis(&keys, vec![]);
let (mut consensus, mut pipe) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis);
let (mut consensus, mut pipe) =
testonly::make_consensus(ctx, &keys[0], &val_set, &genesis).await;
// TODO: this test assumes a specific implementation of the leader schedule.
// Make it leader-schedule agnostic (use epoch to select a specific view).
consensus.replica.view = ViewNumber(1);
consensus.replica.high_qc = rng.gen();
consensus.replica.high_qc.message.view = ViewNumber(0);
consensus.replica.start_new_view(ctx, &consensus.inner);

scope::run!(ctx, |ctx, s| {
s.spawn_blocking(|| {
consensus
.replica
.start_new_view(ctx, &consensus.inner)
.unwrap();
Ok(())
})
.join(ctx)
})
.await
.unwrap();

let test_new_view_msg = ConsensusInputMessage {
message: consensus
Expand Down
Loading

0 comments on commit fe38f3d

Please sign in to comment.