Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Full node mode #13

Merged
merged 19 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/load_testing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ jobs:
- name: Build executor binary
working-directory: node
run: |
build_output=$(cargo build --release -p executor --bin executor --message-format=json) || exit 1
build_output=$(cargo build --release -p tools --bin executor --message-format=json) || exit 1
echo "$build_output" | jq -r 'select(.executable != null) | .executable' \
| while read binary; do
cp "$binary" artifacts/binaries/
Expand Down
6 changes: 0 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,9 @@
# will have compiled files and executables
target/

# These are backup files generated by rustfmt
/*.rs.bk

# Debug logs
logs/

# Config files
config/

# Local load test leftovers
.terraform
.ssh
Expand Down
2 changes: 1 addition & 1 deletion docker/compose.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ WORKDIR /usr/src/myapp/artifacts/$node/
RUN mkdir /usr/src/myapp/artifacts/$node/logs/

# You can ignore this command. In docker-compose.yml file we have specified the different command
CMD ["./executor", "0"]
CMD ["./executor", "0"]
2 changes: 1 addition & 1 deletion docker/localenv.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ RUN cargo run -p tools --bin localnet_config -- --nodes=$nodes

# Build binary file in release mode and create a main release binary
WORKDIR /usr/src/myapp/node
RUN cargo build -p executor --release
RUN cargo build -p tools --bin executor --release

# Create the artifacts directory
WORKDIR /usr/src/myapp/node/
Expand Down
58 changes: 11 additions & 47 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ time = "0.3.23"
tokio = { version = "1.28.1", features = ["full"] }
tracing = { version = "0.1.37", features = ["attributes"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] }
vise = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "8322ddc4bb115a7d11127626730b94f93b804cbe" }
vise-exporter = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "8322ddc4bb115a7d11127626730b94f93b804cbe" }
vise = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "dd05139b76ab0843443ab3ff730174942c825dae" }
vise-exporter = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "dd05139b76ab0843443ab3ff730174942c825dae" }

# Note that "bench" profile inherits from "release" profile and
# "test" profile inherits from "dev" profile.
Expand Down
5 changes: 2 additions & 3 deletions node/actors/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use anyhow::Context as _;
use concurrency::ctx;
use inner::ConsensusInner;
use roles::validator;
use std::sync::Arc;
use storage::ReplicaStateStore;
use storage::FallbackReplicaStateStore;
use tracing::{info, instrument};
use utils::pipe::ActorPipe;

Expand Down Expand Up @@ -54,7 +53,7 @@ impl Consensus {
pipe: ActorPipe<InputMessage, OutputMessage>,
secret_key: validator::SecretKey,
validator_set: validator::ValidatorSet,
storage: Arc<dyn ReplicaStateStore>,
storage: FallbackReplicaStateStore,
) -> anyhow::Result<Self> {
Ok(Consensus {
inner: ConsensusInner {
Expand Down
59 changes: 21 additions & 38 deletions node/actors/consensus/src/replica/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ use crate::{metrics, ConsensusInner};
use anyhow::Context as _;
use concurrency::{ctx, metrics::LatencyHistogramExt as _, scope, time};
use roles::validator;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use storage::{ReplicaStateStore, StorageError};
use std::collections::{BTreeMap, HashMap};
use storage::{FallbackReplicaStateStore, StorageError};
use tracing::instrument;

/// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible
Expand All @@ -27,47 +24,33 @@ pub(crate) struct StateMachine {
/// The deadline to receive an input message.
pub(crate) timeout_deadline: time::Deadline,
/// A reference to the storage module. We use it to backup the replica state.
pub(crate) storage: Arc<dyn ReplicaStateStore>,
pub(crate) storage: FallbackReplicaStateStore,
}

impl StateMachine {
/// Creates a new StateMachine struct. We try to recover a past state from the storage module,
/// otherwise we initialize the state machine with whatever head block we have.
pub(crate) async fn new(
ctx: &ctx::Ctx,
storage: Arc<dyn ReplicaStateStore>,
storage: FallbackReplicaStateStore,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why "fallback"? It is supposed to be the primary source of state, with empty state being the fallback, no?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or you mean WithFallback? But then do we need that verbosity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've meant "store with fallback" (fallback being based on a BlockStore). Would something like FullReplicaStateStore work better?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that would be better. But why not just ReplicaStateStore? The fallback is an implementation detail from the pov of this function. Perhaps we should make FallbackReplicaStateStore into a trait instead of ReplicaStateStore?

) -> anyhow::Result<Self> {
Ok(match storage.replica_state(ctx).await? {
Some(backup) => {
let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
for p in backup.proposals {
block_proposal_cache
.entry(p.number)
.or_default()
.insert(p.payload.hash(), p.payload);
}
Self {
view: backup.view,
phase: backup.phase,
high_vote: backup.high_vote,
high_qc: backup.high_qc,
block_proposal_cache,
timeout_deadline: time::Deadline::Infinite,
storage,
}
}
None => {
let head = storage.head_block(ctx).await?;
Self {
view: head.justification.message.view,
phase: validator::Phase::Prepare,
high_vote: head.justification.message,
high_qc: head.justification,
block_proposal_cache: BTreeMap::new(),
timeout_deadline: time::Deadline::Infinite,
storage,
}
}
let backup = storage.replica_state(ctx).await?;
let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
for proposal in backup.proposals {
block_proposal_cache
.entry(proposal.number)
.or_default()
.insert(proposal.payload.hash(), proposal.payload);
}

Ok(Self {
view: backup.view,
phase: backup.phase,
high_vote: backup.high_vote,
high_qc: backup.high_qc,
block_proposal_cache,
timeout_deadline: time::Deadline::Infinite,
storage,
})
}

Expand Down
4 changes: 2 additions & 2 deletions node/actors/consensus/src/testonly/make.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
use concurrency::ctx;
use roles::validator;
use std::sync::Arc;
use storage::RocksdbStorage;
use storage::{FallbackReplicaStateStore, RocksdbStorage};
use tempfile::tempdir;
use utils::pipe::{self, DispatcherPipe};

Expand All @@ -33,7 +33,7 @@ pub async fn make_consensus(
consensus_pipe,
key.clone(),
validator_set.clone(),
Arc::new(storage),
FallbackReplicaStateStore::from_store(Arc::new(storage)),
);
let consensus = consensus
.await
Expand Down
2 changes: 1 addition & 1 deletion node/actors/consensus/src/testonly/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Node {
network_pipe: DispatcherPipe<network::io::InputMessage, network::io::OutputMessage>,
metrics: channel::UnboundedSender<Metrics>,
) -> anyhow::Result<()> {
let key = self.net.state().cfg().consensus.key.public();
let key = self.net.consensus_config().key.public();
let rng = &mut ctx.rng();
let mut net_recv = network_pipe.recv;
let net_send = network_pipe.send;
Expand Down
27 changes: 14 additions & 13 deletions node/actors/consensus/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use storage::RocksdbStorage;
use storage::{FallbackReplicaStateStore, RocksdbStorage};
use tracing::Instrument as _;
use utils::pipe;

Expand Down Expand Up @@ -44,8 +44,8 @@ impl Test {
// Get only the honest replicas.
let honest: HashSet<_> = nodes
.iter()
.filter(|n| n.behavior == Behavior::Honest)
.map(|n| n.net.state().cfg().consensus.key.public())
.filter(|node| node.behavior == Behavior::Honest)
.map(|node| node.net.consensus_config().key.public())
.collect();
assert!(!honest.is_empty());

Expand Down Expand Up @@ -83,17 +83,18 @@ async fn run_nodes(
) -> anyhow::Result<()> {
let keys: Vec<_> = nodes
.iter()
.map(|r| r.net.state().cfg().consensus.key.clone())
.map(|node| node.net.consensus_config().key.clone())
.collect();
let (genesis_block, _) = testonly::make_genesis(&keys, validator::Payload(vec![]));
let network_ready = signal::Once::new();
let mut network_pipes = HashMap::new();
let mut network_send = HashMap::new();
let mut network_recv = HashMap::new();
scope::run!(ctx, |ctx, s| async {
for (i, n) in nodes.iter().enumerate() {
let validator_key = n.net.state().cfg().consensus.key.clone();
let validator_set = n.net.state().cfg().consensus.validators.clone();
for (i, node) in nodes.iter().enumerate() {
let consensus_config = node.net.consensus_config();
let validator_key = consensus_config.key.clone();
let validator_set = node.net.to_config().validators;

let (consensus_actor_pipe, consensus_pipe) = pipe::new();
let (network_actor_pipe, network_pipe) = pipe::new();
Expand All @@ -105,12 +106,12 @@ async fn run_nodes(
RocksdbStorage::new(ctx, &genesis_block, &dir.path().join("storage"))
.await
.context("RocksdbStorage")?;
let storage = Arc::new(storage);
let storage = FallbackReplicaStateStore::from_store(Arc::new(storage));

let consensus = Consensus::new(
ctx,
consensus_actor_pipe,
n.net.state().cfg().consensus.key.clone(),
node.net.consensus_config().key.clone(),
validator_set,
storage,
)
Expand All @@ -120,7 +121,7 @@ async fn run_nodes(
scope::run!(ctx, |ctx, s| async {
network_ready.recv(ctx).await?;
s.spawn_blocking(|| consensus.run(ctx).context("consensus.run()"));
n.run_executor(ctx, consensus_pipe, network_pipe, metrics.clone())
node.run_executor(ctx, consensus_pipe, network_pipe, metrics.clone())
.await
.context("executor.run()")
})
Expand All @@ -131,10 +132,10 @@ async fn run_nodes(
}
match network {
Network::Real => {
for (i, n) in nodes.iter().enumerate() {
let state = n.net.state().clone();
for (i, node) in nodes.iter().enumerate() {
let state = node.net.state().clone();
let pipe = network_pipes
.remove(&state.cfg().consensus.key.public())
.remove(&node.net.consensus_config().key.public())
.unwrap();
s.spawn(
async {
Expand Down
Loading
Loading