From e438036778c5266235fff468665e169b29d8fcbf Mon Sep 17 00:00:00 2001
From: ss-es <155648797+ss-es@users.noreply.github.com>
Date: Wed, 23 Oct 2024 12:40:09 -0400
Subject: [PATCH] Make `Membership::leader()` return a `Result<_>` (#3738)
---
Cargo.lock | 11 +
Cargo.toml | 1 +
clippy.toml | 1 +
crates/example-types/src/node_types.rs | 16 +-
crates/hotshot/Cargo.toml | 1 +
crates/hotshot/src/tasks/mod.rs | 3 +-
.../traits/election/randomized_committee.rs | 5 +-
.../src/traits/election/static_committee.rs | 5 +-
.../static_committee_leader_two_views.rs | 6 +-
.../src/traits/networking/libp2p_network.rs | 10 +-
crates/hotshot/src/types/handle.rs | 8 +-
.../src/network/behaviours/dht/store.rs | 12 +-
crates/task-impls/Cargo.toml | 1 +
crates/task-impls/src/consensus/handlers.rs | 31 +-
crates/task-impls/src/consensus/mod.rs | 10 +-
crates/task-impls/src/consensus2/handlers.rs | 280 +++++++++++++++++
crates/task-impls/src/da.rs | 170 +++++-----
crates/task-impls/src/helpers.rs | 64 ++--
crates/task-impls/src/network.rs | 188 +++++++----
.../src/quorum_proposal/handlers.rs | 21 +-
crates/task-impls/src/quorum_proposal/mod.rs | 126 ++++----
.../src/quorum_proposal_recv/handlers.rs | 14 +-
.../src/quorum_proposal_recv/mod.rs | 2 +-
crates/task-impls/src/quorum_vote/handlers.rs | 8 +-
crates/task-impls/src/quorum_vote/mod.rs | 155 ++++-----
crates/task-impls/src/request.rs | 2 +-
crates/task-impls/src/rewind.rs | 2 +-
crates/task-impls/src/transactions.rs | 88 +++---
crates/task-impls/src/upgrade.rs | 127 ++++----
crates/task-impls/src/vid.rs | 2 +-
crates/task-impls/src/view_sync.rs | 182 ++++++-----
crates/task-impls/src/vote_collection.rs | 180 +++++++----
crates/task/Cargo.toml | 1 +
crates/task/src/task.rs | 2 +-
crates/testing/src/view_generator.rs | 12 +-
crates/testing/tests/tests_1/da_task.rs | 3 +-
crates/testing/tests/tests_1/network_task.rs | 3 +-
.../tests/tests_1/quorum_proposal_task.rs | 3 +-
.../testing/tests/tests_1/transaction_task.rs | 3 +-
.../tests_1/upgrade_task_with_proposal.rs | 3 +-
crates/testing/tests/tests_1/vid_task.rs | 3 +-
.../tests/tests_1/vote_dependency_handle.rs | 3 +-
.../testing/tests/tests_3/byzantine_tests.rs | 2 +-
.../testing/tests/tests_3/memory_network.rs | 3 +-
crates/types/Cargo.toml | 1 +
crates/types/src/consensus.rs | 70 ++---
crates/types/src/data.rs | 50 +--
crates/types/src/message.rs | 14 +-
crates/types/src/simple_certificate.rs | 2 +-
crates/types/src/simple_vote.rs | 6 +-
crates/types/src/traits/election.rs | 9 +-
crates/types/src/vote.rs | 2 +-
crates/utils/Cargo.toml | 11 +
crates/utils/src/anytrace.rs | 191 ++++++++++++
crates/utils/src/anytrace/macros.rs | 293 ++++++++++++++++++
crates/utils/src/lib.rs | 4 +
56 files changed, 1683 insertions(+), 743 deletions(-)
create mode 100644 clippy.toml
create mode 100644 crates/task-impls/src/consensus2/handlers.rs
create mode 100644 crates/utils/Cargo.toml
create mode 100644 crates/utils/src/anytrace.rs
create mode 100644 crates/utils/src/anytrace/macros.rs
create mode 100644 crates/utils/src/lib.rs
diff --git a/Cargo.lock b/Cargo.lock
index e7ddaefcdc..ee0bdeb3bc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3137,6 +3137,7 @@ dependencies = [
"toml",
"tracing",
"url",
+ "utils",
"vbs",
]
@@ -3330,6 +3331,7 @@ dependencies = [
"futures",
"tokio",
"tracing",
+ "utils",
]
[[package]]
@@ -3364,6 +3366,7 @@ dependencies = [
"tokio",
"tracing",
"url",
+ "utils",
"vbs",
"vec1",
]
@@ -3467,6 +3470,7 @@ dependencies = [
"tracing",
"typenum",
"url",
+ "utils",
"vbs",
"vec1",
]
@@ -8382,6 +8386,13 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
+[[package]]
+name = "utils"
+version = "0.5.78"
+dependencies = [
+ "tracing",
+]
+
[[package]]
name = "uuid"
version = "1.8.0"
diff --git a/Cargo.toml b/Cargo.toml
index 099fff2882..303ad5a100 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,6 +24,7 @@ members = [
"crates/types",
"crates/builder-api",
"crates/fakeapi",
+ "crates/utils",
]
resolver = "2"
diff --git a/clippy.toml b/clippy.toml
new file mode 100644
index 0000000000..edd8ddcbdf
--- /dev/null
+++ b/clippy.toml
@@ -0,0 +1 @@
+allowed-wildcard-imports = [ "utils", "hotshot_task_impls", "hotshot_types" ]
diff --git a/crates/example-types/src/node_types.rs b/crates/example-types/src/node_types.rs
index 1ab2446c12..8884d3e7ce 100644
--- a/crates/example-types/src/node_types.rs
+++ b/crates/example-types/src/node_types.rs
@@ -4,12 +4,6 @@
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see .
-use crate::{
- auction_results_provider_types::{TestAuctionResult, TestAuctionResultsProvider},
- block_types::{TestBlockHeader, TestBlockPayload, TestTransaction},
- state_types::{TestInstanceState, TestValidatedState},
- storage_types::TestStorage,
-};
use hotshot::traits::{
election::{
randomized_committee::RandomizedCommittee, static_committee::StaticCommittee,
@@ -18,15 +12,21 @@ use hotshot::traits::{
implementations::{CombinedNetworks, Libp2pNetwork, MemoryNetwork, PushCdnNetwork},
NodeImplementation,
};
-use hotshot_types::data::EpochNumber;
use hotshot_types::{
- data::ViewNumber,
+ data::{EpochNumber, ViewNumber},
signature_key::{BLSPubKey, BuilderKey},
traits::node_implementation::{NodeType, Versions},
};
use serde::{Deserialize, Serialize};
use vbs::version::StaticVersion;
+use crate::{
+ auction_results_provider_types::{TestAuctionResult, TestAuctionResultsProvider},
+ block_types::{TestBlockHeader, TestBlockPayload, TestTransaction},
+ state_types::{TestInstanceState, TestValidatedState},
+ storage_types::TestStorage,
+};
+
#[derive(
Copy,
Clone,
diff --git a/crates/hotshot/Cargo.toml b/crates/hotshot/Cargo.toml
index 874e94939d..c3e345f43a 100644
--- a/crates/hotshot/Cargo.toml
+++ b/crates/hotshot/Cargo.toml
@@ -58,6 +58,7 @@ sha2 = { workspace = true }
url = { workspace = true }
num_enum = "0.7"
parking_lot = "0.12"
+utils = { path = "../utils" }
[target.'cfg(all(async_executor_impl = "tokio"))'.dependencies]
tokio = { workspace = true }
diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs
index 33341f9013..cca95d3c76 100644
--- a/crates/hotshot/src/tasks/mod.rs
+++ b/crates/hotshot/src/tasks/mod.rs
@@ -11,8 +11,7 @@ pub mod task_state;
use std::{fmt::Debug, sync::Arc, time::Duration};
use async_broadcast::{broadcast, RecvError};
-use async_compatibility_layer::art::async_sleep;
-use async_compatibility_layer::art::async_spawn;
+use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
use async_trait::async_trait;
use futures::{
diff --git a/crates/hotshot/src/traits/election/randomized_committee.rs b/crates/hotshot/src/traits/election/randomized_committee.rs
index 4fed098e9c..d664e2a6e8 100644
--- a/crates/hotshot/src/traits/election/randomized_committee.rs
+++ b/crates/hotshot/src/traits/election/randomized_committee.rs
@@ -17,6 +17,7 @@ use hotshot_types::{
PeerConfig,
};
use rand::{rngs::StdRng, Rng};
+use utils::anytrace::Result;
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
@@ -142,7 +143,7 @@ impl Membership for RandomizedCommittee {
&self,
view_number: TYPES::View,
_epoch: ::Epoch,
- ) -> TYPES::SignatureKey {
+ ) -> Result {
let mut rng: StdRng = rand::SeedableRng::seed_from_u64(*view_number);
let randomized_view_number: u64 = rng.gen_range(0..=u64::MAX);
@@ -151,7 +152,7 @@ impl Membership for RandomizedCommittee {
let res = self.eligible_leaders[index].clone();
- TYPES::SignatureKey::public_key(&res)
+ Ok(TYPES::SignatureKey::public_key(&res))
}
/// Get the total number of nodes in the committee
diff --git a/crates/hotshot/src/traits/election/static_committee.rs b/crates/hotshot/src/traits/election/static_committee.rs
index 2ef52a66e2..acacc51cb6 100644
--- a/crates/hotshot/src/traits/election/static_committee.rs
+++ b/crates/hotshot/src/traits/election/static_committee.rs
@@ -16,6 +16,7 @@ use hotshot_types::{
},
PeerConfig,
};
+use utils::anytrace::Result;
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
@@ -140,11 +141,11 @@ impl Membership for StaticCommittee {
&self,
view_number: TYPES::View,
_epoch: ::Epoch,
- ) -> TYPES::SignatureKey {
+ ) -> Result {
#[allow(clippy::cast_possible_truncation)]
let index = *view_number as usize % self.eligible_leaders.len();
let res = self.eligible_leaders[index].clone();
- TYPES::SignatureKey::public_key(&res)
+ Ok(TYPES::SignatureKey::public_key(&res))
}
/// Get the total number of nodes in the committee
diff --git a/crates/hotshot/src/traits/election/static_committee_leader_two_views.rs b/crates/hotshot/src/traits/election/static_committee_leader_two_views.rs
index db41aad2ab..bb9574e37e 100644
--- a/crates/hotshot/src/traits/election/static_committee_leader_two_views.rs
+++ b/crates/hotshot/src/traits/election/static_committee_leader_two_views.rs
@@ -16,6 +16,7 @@ use hotshot_types::{
},
PeerConfig,
};
+use utils::anytrace::Result;
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
@@ -140,11 +141,12 @@ impl Membership for StaticCommitteeLeaderForTwoViews::Epoch,
- ) -> TYPES::SignatureKey {
+ ) -> Result {
let index =
usize::try_from((*view_number / 2) % self.eligible_leaders.len() as u64).unwrap();
let res = self.eligible_leaders[index].clone();
- TYPES::SignatureKey::public_key(&res)
+
+ Ok(TYPES::SignatureKey::public_key(&res))
}
/// Get the total number of nodes in the committee
diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs
index d25fc91762..34e5992f13 100644
--- a/crates/hotshot/src/traits/networking/libp2p_network.rs
+++ b/crates/hotshot/src/traits/networking/libp2p_network.rs
@@ -1025,7 +1025,15 @@ impl ConnectedNetwork for Libp2pNetwork {
{
let future_view = ::View::new(view) + LOOK_AHEAD;
let epoch = ::Epoch::new(epoch);
- let future_leader = membership.leader(future_view, epoch);
+ let future_leader = match membership.leader(future_view, epoch) {
+ Ok(l) => l,
+ Err(e) => {
+ return tracing::info!(
+ "Failed to calculate leader for view {:?}: {e}",
+ future_view
+ );
+ }
+ };
let _ = self
.queue_node_lookup(ViewNumber::new(*future_view), future_leader)
diff --git a/crates/hotshot/src/types/handle.rs b/crates/hotshot/src/types/handle.rs
index cf6c8ffe02..735c219420 100644
--- a/crates/hotshot/src/types/handle.rs
+++ b/crates/hotshot/src/types/handle.rs
@@ -8,7 +8,7 @@
use std::sync::Arc;
-use anyhow::{anyhow, Ok, Result};
+use anyhow::{anyhow, Context, Ok, Result};
use async_broadcast::{InactiveReceiver, Receiver, Sender};
use async_lock::RwLock;
use committable::{Commitment, Committable};
@@ -315,16 +315,20 @@ impl + 'static, V: Versions>
}
/// Wrapper for `HotShotConsensusApi`'s `leader` function
+ ///
+ /// # Errors
+ /// Returns an error if the leader cannot be calculated
#[allow(clippy::unused_async)] // async for API compatibility reasons
pub async fn leader(
&self,
view_number: TYPES::View,
epoch_number: TYPES::Epoch,
- ) -> TYPES::SignatureKey {
+ ) -> Result {
self.hotshot
.memberships
.quorum_membership
.leader(view_number, epoch_number)
+ .context("Failed to lookup leader")
}
// Below is for testing only:
diff --git a/crates/libp2p-networking/src/network/behaviours/dht/store.rs b/crates/libp2p-networking/src/network/behaviours/dht/store.rs
index 6969ced1ff..cf5c22d61e 100644
--- a/crates/libp2p-networking/src/network/behaviours/dht/store.rs
+++ b/crates/libp2p-networking/src/network/behaviours/dht/store.rs
@@ -37,8 +37,16 @@ impl RecordStore for ValidatedStore
where
K: 'static,
{
- type ProvidedIter<'a> = R::ProvidedIter<'a> where R: 'a, K: 'a;
- type RecordsIter<'a> = R::RecordsIter<'a> where R: 'a, K: 'a;
+ type ProvidedIter<'a>
+ = R::ProvidedIter<'a>
+ where
+ R: 'a,
+ K: 'a;
+ type RecordsIter<'a>
+ = R::RecordsIter<'a>
+ where
+ R: 'a,
+ K: 'a;
// Delegate all `RecordStore` methods except `put` to the inner store
delegate! {
diff --git a/crates/task-impls/Cargo.toml b/crates/task-impls/Cargo.toml
index 2492438ea4..c9a67c0c6a 100644
--- a/crates/task-impls/Cargo.toml
+++ b/crates/task-impls/Cargo.toml
@@ -38,6 +38,7 @@ tagged-base64 = { workspace = true }
time = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
+utils = { path = "../utils" }
vbs = { workspace = true }
vec1 = { workspace = true }
diff --git a/crates/task-impls/src/consensus/handlers.rs b/crates/task-impls/src/consensus/handlers.rs
index fec58d8409..b6f2b843d3 100644
--- a/crates/task-impls/src/consensus/handlers.rs
+++ b/crates/task-impls/src/consensus/handlers.rs
@@ -6,7 +6,6 @@
use std::{sync::Arc, time::Duration};
-use anyhow::{ensure, Context, Result};
use async_broadcast::Sender;
use async_compatibility_layer::art::{async_sleep, async_spawn};
use chrono::Utc;
@@ -19,7 +18,8 @@ use hotshot_types::{
},
vote::HasViewNumber,
};
-use tracing::{debug, error, instrument};
+use tracing::instrument;
+use utils::anytrace::*;
use super::ConsensusTaskState;
use crate::{
@@ -44,9 +44,9 @@ pub(crate) async fn handle_quorum_vote_recv<
ensure!(
task_state
.quorum_membership
- .leader(vote.view_number() + 1, task_state.cur_epoch)
+ .leader(vote.view_number() + 1, task_state.cur_epoch)?
== task_state.public_key,
- format!(
+ info!(
"We are not the leader for view {:?}",
vote.view_number() + 1
)
@@ -63,7 +63,7 @@ pub(crate) async fn handle_quorum_vote_recv<
sender,
&task_state.upgrade_lock,
)
- .await;
+ .await?;
Ok(())
}
@@ -83,9 +83,9 @@ pub(crate) async fn handle_timeout_vote_recv<
ensure!(
task_state
.timeout_membership
- .leader(vote.view_number() + 1, task_state.cur_epoch)
+ .leader(vote.view_number() + 1, task_state.cur_epoch)?
== task_state.public_key,
- format!(
+ info!(
"We are not the leader for view {:?}",
vote.view_number() + 1
)
@@ -102,7 +102,7 @@ pub(crate) async fn handle_timeout_vote_recv<
sender,
&task_state.upgrade_lock,
)
- .await;
+ .await?;
Ok(())
}
@@ -124,7 +124,7 @@ pub(crate) async fn handle_view_change<
);
let old_view_number = task_state.cur_view;
- debug!("Updating view from {old_view_number:?} to {new_view_number:?}");
+ tracing::debug!("Updating view from {old_view_number:?} to {new_view_number:?}");
// Move this node to the next view
task_state.cur_view = new_view_number;
@@ -138,7 +138,7 @@ pub(crate) async fn handle_view_change<
.clone();
if let Some(cert) = decided_upgrade_certificate_read {
if new_view_number == cert.data.new_version_first_view {
- error!(
+ tracing::error!(
"Version upgraded based on a decided upgrade cert: {:?}",
cert
);
@@ -177,7 +177,7 @@ pub(crate) async fn handle_view_change<
let cur_view_time = Utc::now().timestamp();
if task_state
.quorum_membership
- .leader(old_view_number, task_state.cur_epoch)
+ .leader(old_view_number, task_state.cur_epoch)?
== task_state.public_key
{
#[allow(clippy::cast_precision_loss)]
@@ -228,7 +228,7 @@ pub(crate) async fn handle_timeout
task_state
.timeout_membership
.has_stake(&task_state.public_key, task_state.cur_epoch),
- format!("We were not chosen for the consensus committee for view {view_number:?}")
+ debug!("We were not chosen for the consensus committee for view {view_number:?}")
);
let vote = TimeoutVote::create_signed_vote(
@@ -239,7 +239,8 @@ pub(crate) async fn handle_timeout
&task_state.upgrade_lock,
)
.await
- .context("Failed to sign TimeoutData")?;
+ .wrap()
+ .context(error!("Failed to sign TimeoutData"))?;
broadcast_event(Arc::new(HotShotEvent::TimeoutVoteSend(vote)), sender).await;
broadcast_event(
@@ -251,7 +252,7 @@ pub(crate) async fn handle_timeout
)
.await;
- debug!(
+ tracing::debug!(
"We did not receive evidence for view {} in time, sending timeout vote for that view!",
*view_number
);
@@ -274,7 +275,7 @@ pub(crate) async fn handle_timeout
.add(1);
if task_state
.quorum_membership
- .leader(view_number, task_state.cur_epoch)
+ .leader(view_number, task_state.cur_epoch)?
== task_state.public_key
{
task_state
diff --git a/crates/task-impls/src/consensus/mod.rs b/crates/task-impls/src/consensus/mod.rs
index fb1ec86fca..edff8f6078 100644
--- a/crates/task-impls/src/consensus/mod.rs
+++ b/crates/task-impls/src/consensus/mod.rs
@@ -6,7 +6,6 @@
use std::sync::Arc;
-use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
@@ -27,6 +26,7 @@ use hotshot_types::{
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::instrument;
+use utils::anytrace::Result;
use self::handlers::{
handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
@@ -106,7 +106,7 @@ impl, V: Versions> ConsensusTaskSt
&mut self,
event: Arc>,
sender: Sender>>,
- ) {
+ ) -> Result<()> {
match event.as_ref() {
HotShotEvent::QuorumVoteRecv(ref vote) => {
if let Err(e) =
@@ -149,6 +149,8 @@ impl, V: Versions> ConsensusTaskSt
}
_ => {}
}
+
+ Ok(())
}
}
@@ -164,9 +166,7 @@ impl, V: Versions> TaskState
sender: &Sender>,
_receiver: &Receiver>,
) -> Result<()> {
- self.handle(event, sender.clone()).await;
-
- Ok(())
+ self.handle(event, sender.clone()).await
}
/// Joins all subtasks.
diff --git a/crates/task-impls/src/consensus2/handlers.rs b/crates/task-impls/src/consensus2/handlers.rs
new file mode 100644
index 0000000000..ec87f1b159
--- /dev/null
+++ b/crates/task-impls/src/consensus2/handlers.rs
@@ -0,0 +1,280 @@
+// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
+// This file is part of the HotShot repository.
+
+// You should have received a copy of the MIT License
+// along with the HotShot repository. If not, see .
+
+use std::{sync::Arc, time::Duration};
+
+use utils::anytrace::{ensure, Context, Result};
+use async_broadcast::Sender;
+use async_compatibility_layer::art::{async_sleep, async_spawn};
+use chrono::Utc;
+use hotshot_types::{
+ event::{Event, EventType},
+ simple_vote::{QuorumVote, TimeoutData, TimeoutVote},
+ traits::{
+ election::Membership,
+ node_implementation::{ConsensusTime, NodeImplementation, NodeType},
+ },
+ vote::HasViewNumber,
+};
+use tracing::{debug, error, instrument};
+
+use super::Consensus2TaskState;
+use crate::{
+ consensus2::Versions,
+ events::HotShotEvent,
+ helpers::{broadcast_event, cancel_task},
+ vote_collection::handle_vote,
+};
+
+/// Handle a `QuorumVoteRecv` event.
+pub(crate) async fn handle_quorum_vote_recv<
+ TYPES: NodeType,
+ I: NodeImplementation,
+ V: Versions,
+>(
+ vote: &QuorumVote,
+ event: Arc>,
+ sender: &Sender>>,
+ task_state: &mut Consensus2TaskState,
+) -> Result<()> {
+ // Are we the leader for this view?
+ ensure!(
+ task_state
+ .quorum_membership
+ .leader(vote.view_number() + 1)?
+ == task_state.public_key,
+ format!(
+ "We are not the leader for view {:?}",
+ vote.view_number() + 1
+ )
+ );
+
+ handle_vote(
+ &mut task_state.vote_collectors,
+ vote,
+ task_state.public_key.clone(),
+ &task_state.quorum_membership,
+ task_state.id,
+ &event,
+ sender,
+ &task_state.upgrade_lock,
+ )
+ .await?;
+
+ Ok(())
+}
+
+/// Handle a `TimeoutVoteRecv` event.
+pub(crate) async fn handle_timeout_vote_recv<
+ TYPES: NodeType,
+ I: NodeImplementation,
+ V: Versions,
+>(
+ vote: &TimeoutVote,
+ event: Arc>,
+ sender: &Sender>>,
+ task_state: &mut Consensus2TaskState,
+) -> Result<()> {
+ // Are we the leader for this view?
+ ensure!(
+ task_state
+ .timeout_membership
+ .leader(vote.view_number() + 1)?
+ == task_state.public_key,
+ format!(
+ "We are not the leader for view {:?}",
+ vote.view_number() + 1
+ )
+ );
+
+ handle_vote(
+ &mut task_state.timeout_vote_collectors,
+ vote,
+ task_state.public_key.clone(),
+ &task_state.quorum_membership,
+ task_state.id,
+ &event,
+ sender,
+ &task_state.upgrade_lock,
+ )
+ .await?;
+
+ Ok(())
+}
+
+/// Handle a `ViewChange` event.
+#[instrument(skip_all)]
+pub(crate) async fn handle_view_change<
+ TYPES: NodeType,
+ I: NodeImplementation,
+ V: Versions,
+>(
+ new_view_number: TYPES::Time,
+ sender: &Sender>>,
+ task_state: &mut Consensus2TaskState,
+) -> Result<()> {
+ ensure!(
+ new_view_number > task_state.cur_view,
+ "New view is not larger than the current view"
+ );
+
+ let old_view_number = task_state.cur_view;
+ debug!("Updating view from {old_view_number:?} to {new_view_number:?}");
+
+ // Move this node to the next view
+ task_state.cur_view = new_view_number;
+
+ // If we have a decided upgrade certificate, the protocol version may also have been upgraded.
+ let decided_upgrade_certificate_read = task_state
+ .upgrade_lock
+ .decided_upgrade_certificate
+ .read()
+ .await
+ .clone();
+ if let Some(cert) = decided_upgrade_certificate_read {
+ if new_view_number == cert.data.new_version_first_view {
+ error!(
+ "Version upgraded based on a decided upgrade cert: {:?}",
+ cert
+ );
+ }
+ }
+
+ // Spawn a timeout task if we did actually update view
+ let timeout = task_state.timeout;
+ let new_timeout_task = async_spawn({
+ let stream = sender.clone();
+ // Nuance: We timeout on the view + 1 here because that means that we have
+ // not seen evidence to transition to this new view
+ let view_number = new_view_number + 1;
+ async move {
+ async_sleep(Duration::from_millis(timeout)).await;
+ broadcast_event(
+ Arc::new(HotShotEvent::Timeout(TYPES::Time::new(*view_number))),
+ &stream,
+ )
+ .await;
+ }
+ });
+
+ // Cancel the old timeout task
+ cancel_task(std::mem::replace(
+ &mut task_state.timeout_task,
+ new_timeout_task,
+ ))
+ .await;
+
+ let consensus = task_state.consensus.read().await;
+ consensus
+ .metrics
+ .current_view
+ .set(usize::try_from(task_state.cur_view.u64()).unwrap());
+ let cur_view_time = Utc::now().timestamp();
+ if task_state.quorum_membership.leader(old_view_number)? == task_state.public_key {
+ #[allow(clippy::cast_precision_loss)]
+ consensus
+ .metrics
+ .view_duration_as_leader
+ .add_point((cur_view_time - task_state.cur_view_time) as f64);
+ }
+ task_state.cur_view_time = cur_view_time;
+
+ // Do the comparison before the subtraction to avoid potential overflow, since
+ // `last_decided_view` may be greater than `cur_view` if the node is catching up.
+ if usize::try_from(task_state.cur_view.u64()).unwrap()
+ > usize::try_from(task_state.last_decided_view.u64()).unwrap()
+ {
+ consensus.metrics.number_of_views_since_last_decide.set(
+ usize::try_from(task_state.cur_view.u64()).unwrap()
+ - usize::try_from(task_state.last_decided_view.u64()).unwrap(),
+ );
+ }
+
+ broadcast_event(
+ Event {
+ view_number: old_view_number,
+ event: EventType::ViewFinished {
+ view_number: old_view_number,
+ },
+ },
+ &task_state.output_event_stream,
+ )
+ .await;
+ Ok(())
+}
+
+/// Handle a `Timeout` event.
+#[instrument(skip_all)]
+pub(crate) async fn handle_timeout, V: Versions>(
+ view_number: TYPES::Time,
+ sender: &Sender>>,
+ task_state: &mut Consensus2TaskState,
+) -> Result<()> {
+ ensure!(
+ task_state.cur_view < view_number,
+ "Timeout event is for an old view"
+ );
+
+ ensure!(
+ task_state
+ .timeout_membership
+ .has_stake(&task_state.public_key),
+ format!("We were not chosen for the consensus committee for view {view_number:?}")
+ );
+
+ let vote = TimeoutVote::create_signed_vote(
+ TimeoutData:: { view: view_number },
+ view_number,
+ &task_state.public_key,
+ &task_state.private_key,
+ &task_state.upgrade_lock,
+ )
+ .await
+ .context("Failed to sign TimeoutData")?;
+
+ broadcast_event(Arc::new(HotShotEvent::TimeoutVoteSend(vote)), sender).await;
+ broadcast_event(
+ Event {
+ view_number,
+ event: EventType::ViewTimeout { view_number },
+ },
+ &task_state.output_event_stream,
+ )
+ .await;
+
+ debug!(
+ "We did not receive evidence for view {} in time, sending timeout vote for that view!",
+ *view_number
+ );
+
+ broadcast_event(
+ Event {
+ view_number,
+ event: EventType::ReplicaViewTimeout { view_number },
+ },
+ &task_state.output_event_stream,
+ )
+ .await;
+
+ task_state
+ .consensus
+ .read()
+ .await
+ .metrics
+ .number_of_timeouts
+ .add(1);
+ if task_state.quorum_membership.leader(view_number)? == task_state.public_key {
+ task_state
+ .consensus
+ .read()
+ .await
+ .metrics
+ .number_of_timeouts_as_leader
+ .add(1);
+ }
+
+ Ok(())
+}
diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs
index 2e7c1357ff..5ccb50e091 100644
--- a/crates/task-impls/src/da.rs
+++ b/crates/task-impls/src/da.rs
@@ -6,7 +6,6 @@
use std::{marker::PhantomData, sync::Arc};
-use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLock;
@@ -35,10 +34,11 @@ use hotshot_types::{
use sha2::{Digest, Sha256};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::spawn_blocking;
-use tracing::{debug, error, info, instrument, warn};
+use tracing::instrument;
+use utils::anytrace::*;
use crate::{
- events::{HotShotEvent, HotShotTaskCompleted},
+ events::HotShotEvent,
helpers::broadcast_event,
vote_collection::{handle_vote, VoteCollectorsMap},
};
@@ -94,11 +94,11 @@ impl, V: Versions> DaTaskState>,
event_stream: Sender>>,
- ) -> Option {
+ ) -> Result<()> {
match event.as_ref() {
HotShotEvent::DaProposalRecv(proposal, sender) => {
let sender = sender.clone();
- debug!(
+ tracing::debug!(
"DA proposal received for view: {:?}",
proposal.data.view_number()
);
@@ -111,35 +111,40 @@ impl, V: Versions> DaTaskState= self.cur_view - 1,
+ "Throwing away DA proposal that is more than one view older"
+ );
- if self
- .consensus
- .read()
- .await
- .saved_payloads()
- .contains_key(&view)
- {
- warn!("Received DA proposal for view {:?} but we already have a payload for that view. Throwing it away", view);
- return None;
- }
+ ensure!(
+ !self
+ .consensus
+ .read()
+ .await
+ .saved_payloads()
+ .contains_key(&view),
+ info!(
+ "Received DA proposal for view {:?} but we already have a payload for that view. Throwing it away",
+ view
+ )
+ );
let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions);
- // ED Is this the right leader?
- let view_leader_key = self.da_membership.leader(view, self.cur_epoch);
- if view_leader_key != sender {
- error!("DA proposal doesn't have expected leader key for view {} \n DA proposal is: {:?}", *view, proposal.data.clone());
- return None;
- }
+ let view_leader_key = self.da_membership.leader(view, self.cur_epoch)?;
+ ensure!(
+ view_leader_key == sender,
+ warn!(
+ "DA proposal doesn't have expected leader key for view {} \n DA proposal is: {:?}",
+ *view,
+ proposal.data.clone()
+ )
+ );
- if !view_leader_key.validate(&proposal.signature, &encoded_transactions_hash) {
- error!("Could not verify proposal.");
- return None;
- }
+ ensure!(
+ view_leader_key.validate(&proposal.signature, &encoded_transactions_hash),
+ warn!("Could not verify proposal.")
+ );
broadcast_event(
Arc::new(HotShotEvent::DaProposalValidated(proposal.clone(), sender)),
@@ -149,10 +154,15 @@ impl, V: Versions> DaTaskState {
let curr_view = self.consensus.read().await.cur_view();
- if curr_view > proposal.data.view_number() + 1 {
- tracing::debug!("Validated DA proposal for prior view but it's too old now Current view {:?}, DA Proposal view {:?}", curr_view, proposal.data.view_number());
- return None;
- }
+ ensure!(
+ curr_view <= proposal.data.view_number() + 1,
+ debug!(
+ "Validated DA proposal for prior view but it's too old now Current view {:?}, DA Proposal view {:?}",
+ curr_view,
+ proposal.data.view_number()
+ )
+ );
+
// Proposal is fresh and valid, notify the application layer
broadcast_event(
Event {
@@ -166,23 +176,23 @@ impl, V: Versions> DaTaskState, V: Versions> DaTaskState, V: Versions> DaTaskState, V: Versions> DaTaskState {
- debug!("DA vote recv, Main Task {:?}", vote.view_number());
+ tracing::debug!("DA vote recv, Main Task {:?}", vote.view_number());
// Check if we are the leader and the vote is from the sender.
let view = vote.view_number();
- if self.da_membership.leader(view, self.cur_epoch) != self.public_key {
- error!("We are not the DA committee leader for view {} are we leader for next view? {}", *view, self.da_membership.leader(view + 1, self.cur_epoch) == self.public_key);
- return None;
- }
+
+ ensure!(
+ self.da_membership.leader(view, self.cur_epoch)? == self.public_key,
+ debug!(
+ "We are not the DA committee leader for view {} are we leader for next view? {}",
+ *view,
+ self.da_membership.leader(view + 1, self.cur_epoch)? == self.public_key
+ )
+ );
handle_vote(
&mut self.vote_collectors,
@@ -284,26 +295,29 @@ impl, V: Versions> DaTaskState {
let view = *view;
- if (*view != 0 || *self.cur_view > 0) && *self.cur_view >= *view {
- return None;
- }
+
+ ensure!(
+ *self.cur_view < *view,
+ info!("Received a view change to an older view.")
+ );
if *view - *self.cur_view > 1 {
- info!("View changed by more than 1 going to view {:?}", view);
+ tracing::info!("View changed by more than 1 going to view {:?}", view);
}
self.cur_view = view;
// If we are not the next leader (DA leader for this view) immediately exit
- if self.da_membership.leader(self.cur_view + 1, self.cur_epoch) != self.public_key {
- return None;
- }
- debug!("Polling for DA votes for view {}", *self.cur_view + 1);
+ ensure!(
+ self.da_membership
+ .leader(self.cur_view + 1, self.cur_epoch)?
+ == self.public_key
+ );
- return None;
+ tracing::debug!("Polling for DA votes for view {}", *self.cur_view + 1);
}
HotShotEvent::BlockRecv(packed_bundle) => {
let PackedBundle:: {
@@ -318,12 +332,9 @@ impl, V: Versions> DaTaskState = DaProposal {
encoded_transactions: Arc::clone(encoded_transactions),
@@ -347,14 +358,9 @@ impl, V: Versions> DaTaskState {
- error!("Shutting down because of shutdown signal!");
- return Some(HotShotTaskCompleted);
- }
_ => {}
}
- None
+ Ok(())
}
}
@@ -371,9 +377,7 @@ impl, V: Versions> TaskState
sender: &Sender>,
_receiver: &Receiver>,
) -> Result<()> {
- self.handle(event, sender.clone()).await;
-
- Ok(())
+ self.handle(event, sender.clone()).await
}
async fn cancel_subtasks(&mut self) {}
diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs
index 268b43b2d3..a3cd151971 100644
--- a/crates/task-impls/src/helpers.rs
+++ b/crates/task-impls/src/helpers.rs
@@ -10,7 +10,6 @@ use std::{
sync::Arc,
};
-use anyhow::{bail, ensure, Context, Result};
use async_broadcast::{Receiver, SendError, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn, async_timeout};
use async_lock::RwLock;
@@ -39,7 +38,8 @@ use hotshot_types::{
};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
-use tracing::{debug, info, instrument, warn};
+use tracing::instrument;
+use utils::anytrace::*;
use crate::{
events::HotShotEvent, quorum_proposal_recv::QuorumProposalRecvTaskState,
@@ -70,7 +70,9 @@ pub(crate) async fn fetch_proposal(
let signature = TYPES::SignatureKey::sign(
&sender_private_key,
signed_proposal_request.commit().as_ref(),
- )?;
+ )
+ .wrap()
+ .context(error!("Failed to sign proposal. This should never happen."))?;
// First, broadcast that we need a proposal to the current leader
broadcast_event(
@@ -298,9 +300,11 @@ pub async fn decide_from_proposal(
if let Some(cert) = leaf.upgrade_certificate() {
if leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
if cert.data.decide_by < view_number {
- warn!("Failed to decide an upgrade certificate in time. Ignoring.");
+ tracing::warn!(
+ "Failed to decide an upgrade certificate in time. Ignoring."
+ );
} else {
- info!("Reached decide on upgrade certificate: {:?}", cert);
+ tracing::info!("Reached decide on upgrade certificate: {:?}", cert);
res.decided_upgrade_cert = Some(cert.clone());
}
}
@@ -346,13 +350,13 @@ pub async fn decide_from_proposal(
true
},
) {
- debug!("Leaf ascension failed; error={e}");
+ tracing::debug!("Leaf ascension failed; error={e}");
}
res
}
-/// Gets the parent leaf and state from the parent of a proposal, returning an [`anyhow::Error`] if not.
+/// Gets the parent leaf and state from the parent of a proposal, returning an [`utils::anytrace::Error`] if not.
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn parent_leaf_and_state(
@@ -367,8 +371,11 @@ pub(crate) async fn parent_leaf_and_state(
) -> Result<(Leaf, Arc<::ValidatedState>)> {
let current_epoch = consensus.read().await.cur_epoch();
ensure!(
- quorum_membership.leader(next_proposal_view_number, current_epoch) == public_key,
- "Somehow we formed a QC but are not the leader for the next view {next_proposal_view_number:?}",
+ quorum_membership.leader(next_proposal_view_number, current_epoch)? == public_key,
+ info!(
+ "Somehow we formed a QC but are not the leader for the next view {:?}",
+ next_proposal_view_number
+ )
);
let parent_view_number = consensus.read().await.high_qc().view_number();
if !consensus
@@ -388,22 +395,21 @@ pub(crate) async fn parent_leaf_and_state(
upgrade_lock,
)
.await
- .context("Failed to fetch proposal")?;
+ .context(info!("Failed to fetch proposal"))?;
}
let consensus_reader = consensus.read().await;
let parent_view_number = consensus_reader.high_qc().view_number();
let parent_view = consensus_reader.validated_state_map().get(&parent_view_number).context(
- format!("Couldn't find parent view in state map, waiting for replica to see proposal; parent_view_number: {}", *parent_view_number)
+ debug!("Couldn't find parent view in state map, waiting for replica to see proposal; parent_view_number: {}", *parent_view_number)
)?;
- // Leaf hash in view inner does not match high qc hash - Why?
let (leaf_commitment, state) = parent_view.leaf_and_state().context(
- format!("Parent of high QC points to a view without a proposal; parent_view_number: {parent_view_number:?}, parent_view {parent_view:?}")
+ info!("Parent of high QC points to a view without a proposal; parent_view_number: {parent_view_number:?}, parent_view {parent_view:?}")
)?;
if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
// NOTE: This happens on the genesis block
- debug!(
+ tracing::debug!(
"They don't equal: {:?} {:?}",
leaf_commitment,
consensus_reader.high_qc().data().leaf_commit
@@ -413,7 +419,7 @@ pub(crate) async fn parent_leaf_and_state(
let leaf = consensus_reader
.saved_leaves()
.get(&leaf_commitment)
- .context("Failed to find high QC of parent")?;
+ .context(info!("Failed to find high QC of parent"))?;
Ok((leaf.clone(), Arc::clone(state)))
}
@@ -531,7 +537,7 @@ pub async fn validate_proposal_safety_and_liveness<
.await;
}
- format!("Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked view is {:?}", read_consensus.high_qc(), proposal.data.clone(), read_consensus.locked_view())
+ error!("Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked view is {:?}", read_consensus.high_qc(), proposal.data.clone(), read_consensus.locked_view())
});
}
@@ -542,7 +548,9 @@ pub async fn validate_proposal_safety_and_liveness<
.write()
.await
.append_proposal(&proposal)
- .await?;
+ .await
+ .wrap()
+ .context(error!("Failed to append proposal in storage!"))?;
// We accept the proposal, notify the application layer
broadcast_event(
@@ -603,7 +611,7 @@ pub async fn validate_proposal_view_and_certs<
// Verify a timeout certificate OR a view sync certificate exists and is valid.
if proposal.data.justify_qc.view_number() != view - 1 {
let received_proposal_cert =
- proposal.data.proposal_certificate.clone().context(format!(
+ proposal.data.proposal_certificate.clone().context(debug!(
"Quorum proposal for view {} needed a timeout or view sync certificate, but did not have one",
*view
))?;
@@ -667,7 +675,7 @@ pub async fn validate_proposal_view_and_certs<
/// `timeout_task` which are updated during the operation of the function.
///
/// # Errors
-/// Returns an [`anyhow::Error`] when the new view is not greater than the current view.
+/// Returns an [`utils::anytrace::Error`] when the new view is not greater than the current view.
pub(crate) async fn update_view, V: Versions>(
new_view: TYPES::View,
event_stream: &Sender>>,
@@ -680,14 +688,14 @@ pub(crate) async fn update_view, V
let is_old_view_leader = task_state
.quorum_membership
- .leader(task_state.cur_view, task_state.cur_epoch)
+ .leader(task_state.cur_view, task_state.cur_epoch)?
== task_state.public_key;
let old_view = task_state.cur_view;
- debug!("Updating view from {} to {}", *old_view, *new_view);
+ tracing::debug!("Updating view from {} to {}", *old_view, *new_view);
if *old_view / 100 != *new_view / 100 {
- info!("Progress: entered view {:>6}", *new_view);
+ tracing::info!("Progress: entered view {:>6}", *new_view);
}
task_state.cur_view = new_view;
@@ -792,15 +800,3 @@ pub async fn broadcast_event(event: E, sender: &Send
}
}
}
-
-/// Utilities to print anyhow logs.
-pub trait AnyhowTracing {
- /// Print logs as debug
- fn err_as_debug(self);
-}
-
-impl AnyhowTracing for anyhow::Result {
- fn err_as_debug(self) {
- let _ = self.inspect_err(|e| tracing::debug!("{}", format!("{:?}", e)));
- }
-}
diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs
index 5b897c7b4f..d8ef530b54 100644
--- a/crates/task-impls/src/network.rs
+++ b/crates/task-impls/src/network.rs
@@ -6,7 +6,6 @@
use std::{collections::HashMap, sync::Arc};
-use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLock;
@@ -31,7 +30,8 @@ use hotshot_types::{
},
vote::{HasViewNumber, Vote},
};
-use tracing::{error, instrument, warn};
+use tracing::instrument;
+use utils::anytrace::*;
use crate::{
events::{HotShotEvent, HotShotTaskCompleted},
@@ -104,7 +104,7 @@ impl NetworkMessageTaskState {
HotShotEvent::UpgradeProposalRecv(message, sender)
}
GeneralConsensusMessage::UpgradeVote(message) => {
- error!("Received upgrade vote!");
+ tracing::error!("Received upgrade vote!");
HotShotEvent::UpgradeVoteRecv(message)
}
},
@@ -270,7 +270,7 @@ impl<
let serialized_message = match self.upgrade_lock.serialize(&message).await {
Ok(serialized) => serialized,
Err(e) => {
- error!("Failed to serialize message: {}", e);
+ tracing::error!("Failed to serialize message: {}", e);
continue;
}
};
@@ -295,7 +295,7 @@ impl<
}
match net.vid_broadcast_message(messages).await {
Ok(()) => {}
- Err(e) => warn!("Failed to send message from network task: {:?}", e),
+ Err(e) => tracing::warn!("Failed to send message from network task: {:?}", e),
}
});
@@ -308,16 +308,16 @@ impl<
storage: Arc>,
state: Arc>>,
view: ::View,
- ) -> Result<(), ()> {
+ ) -> std::result::Result<(), ()> {
if let Some(action) = maybe_action {
if !state.write().await.update_action(action, view) {
- warn!("Already actioned {:?} in view {:?}", action, view);
+ tracing::warn!("Already actioned {:?} in view {:?}", action, view);
return Err(());
}
match storage.write().await.record_action(view, action).await {
Ok(()) => Ok(()),
Err(e) => {
- warn!("Not Sending {:?} because of storage error: {:?}", action, e);
+ tracing::warn!("Not Sending {:?} because of storage error: {:?}", action, e);
Err(())
}
}
@@ -355,15 +355,25 @@ impl<
// ED Each network task is subscribed to all these message types. Need filters per network task
HotShotEvent::QuorumVoteSend(vote) => {
*maybe_action = Some(HotShotAction::Vote);
+ let view_number = vote.view_number() + 1;
+ let leader = match self.quorum_membership.leader(view_number, self.epoch) {
+ Ok(l) => l,
+ Err(e) => {
+ tracing::warn!(
+ "Failed to calculate leader for view number {:?}. Error: {:?}",
+ view_number,
+ e
+ );
+ return None;
+ }
+ };
+
Some((
vote.signing_key(),
MessageKind::::from_consensus_message(SequencingMessage::General(
GeneralConsensusMessage::Vote(vote.clone()),
)),
- TransmitType::Direct(
- self.quorum_membership
- .leader(vote.view_number() + 1, self.epoch),
- ),
+ TransmitType::Direct(leader),
))
}
HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
@@ -396,15 +406,25 @@ impl<
}
HotShotEvent::DaVoteSend(vote) => {
*maybe_action = Some(HotShotAction::DaVote);
+ let view_number = vote.view_number();
+ let leader = match self.quorum_membership.leader(view_number, self.epoch) {
+ Ok(l) => l,
+ Err(e) => {
+ tracing::warn!(
+ "Failed to calculate leader for view number {:?}. Error: {:?}",
+ view_number,
+ e
+ );
+ return None;
+ }
+ };
+
Some((
vote.signing_key(),
MessageKind::::from_consensus_message(SequencingMessage::Da(
DaConsensusMessage::DaVote(vote.clone()),
)),
- TransmitType::Direct(
- self.quorum_membership
- .leader(vote.view_number(), self.epoch),
- ),
+ TransmitType::Direct(leader),
))
}
HotShotEvent::DacSend(certificate, sender) => {
@@ -417,36 +437,72 @@ impl<
TransmitType::Broadcast,
))
}
- HotShotEvent::ViewSyncPreCommitVoteSend(vote) => Some((
- vote.signing_key(),
- MessageKind::::from_consensus_message(SequencingMessage::General(
- GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone()),
- )),
- TransmitType::Direct(
- self.quorum_membership
- .leader(vote.view_number() + vote.date().relay, self.epoch),
- ),
- )),
- HotShotEvent::ViewSyncCommitVoteSend(vote) => Some((
- vote.signing_key(),
- MessageKind::::from_consensus_message(SequencingMessage::General(
- GeneralConsensusMessage::ViewSyncCommitVote(vote.clone()),
- )),
- TransmitType::Direct(
- self.quorum_membership
- .leader(vote.view_number() + vote.date().relay, self.epoch),
- ),
- )),
- HotShotEvent::ViewSyncFinalizeVoteSend(vote) => Some((
- vote.signing_key(),
- MessageKind::::from_consensus_message(SequencingMessage::General(
- GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone()),
- )),
- TransmitType::Direct(
- self.quorum_membership
- .leader(vote.view_number() + vote.date().relay, self.epoch),
- ),
- )),
+ HotShotEvent::ViewSyncPreCommitVoteSend(vote) => {
+ let view_number = vote.view_number() + vote.date().relay;
+ let leader = match self.quorum_membership.leader(view_number, self.epoch) {
+ Ok(l) => l,
+ Err(e) => {
+ tracing::warn!(
+ "Failed to calculate leader for view number {:?}. Error: {:?}",
+ view_number,
+ e
+ );
+ return None;
+ }
+ };
+
+ Some((
+ vote.signing_key(),
+ MessageKind::::from_consensus_message(SequencingMessage::General(
+ GeneralConsensusMessage::ViewSyncPreCommitVote(vote.clone()),
+ )),
+ TransmitType::Direct(leader),
+ ))
+ }
+ HotShotEvent::ViewSyncCommitVoteSend(vote) => {
+ let view_number = vote.view_number() + vote.date().relay;
+ let leader = match self.quorum_membership.leader(view_number, self.epoch) {
+ Ok(l) => l,
+ Err(e) => {
+ tracing::warn!(
+ "Failed to calculate leader for view number {:?}. Error: {:?}",
+ view_number,
+ e
+ );
+ return None;
+ }
+ };
+
+ Some((
+ vote.signing_key(),
+ MessageKind::::from_consensus_message(SequencingMessage::General(
+ GeneralConsensusMessage::ViewSyncCommitVote(vote.clone()),
+ )),
+ TransmitType::Direct(leader),
+ ))
+ }
+ HotShotEvent::ViewSyncFinalizeVoteSend(vote) => {
+ let view_number = vote.view_number() + vote.date().relay;
+ let leader = match self.quorum_membership.leader(view_number, self.epoch) {
+ Ok(l) => l,
+ Err(e) => {
+ tracing::warn!(
+ "Failed to calculate leader for view number {:?}. Error: {:?}",
+ view_number,
+ e
+ );
+ return None;
+ }
+ };
+
+ Some((
+ vote.signing_key(),
+ MessageKind::::from_consensus_message(SequencingMessage::General(
+ GeneralConsensusMessage::ViewSyncFinalizeVote(vote.clone()),
+ )),
+ TransmitType::Direct(leader),
+ ))
+ }
HotShotEvent::ViewSyncPreCommitCertificate2Send(certificate, sender) => Some((
sender,
MessageKind::::from_consensus_message(SequencingMessage::General(
@@ -470,15 +526,24 @@ impl<
)),
HotShotEvent::TimeoutVoteSend(vote) => {
*maybe_action = Some(HotShotAction::Vote);
+ let view_number = vote.view_number() + 1;
+ let leader = match self.quorum_membership.leader(view_number, self.epoch) {
+ Ok(l) => l,
+ Err(e) => {
+ tracing::warn!(
+ "Failed to calculate leader for view number {:?}. Error: {:?}",
+ view_number,
+ e
+ );
+ return None;
+ }
+ };
Some((
vote.signing_key(),
MessageKind::::from_consensus_message(SequencingMessage::General(
GeneralConsensusMessage::TimeoutVote(vote.clone()),
)),
- TransmitType::Direct(
- self.quorum_membership
- .leader(vote.view_number() + 1, self.epoch),
- ),
+ TransmitType::Direct(leader),
))
}
HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
@@ -489,16 +554,25 @@ impl<
TransmitType::Broadcast,
)),
HotShotEvent::UpgradeVoteSend(vote) => {
- error!("Sending upgrade vote!");
+ tracing::error!("Sending upgrade vote!");
+ let view_number = vote.view_number();
+ let leader = match self.quorum_membership.leader(view_number, self.epoch) {
+ Ok(l) => l,
+ Err(e) => {
+ tracing::warn!(
+ "Failed to calculate leader for view number {:?}. Error: {:?}",
+ view_number,
+ e
+ );
+ return None;
+ }
+ };
Some((
vote.signing_key(),
MessageKind::::from_consensus_message(SequencingMessage::General(
GeneralConsensusMessage::UpgradeVote(vote.clone()),
)),
- TransmitType::Direct(
- self.quorum_membership
- .leader(vote.view_number(), self.epoch),
- ),
+ TransmitType::Direct(leader),
))
}
HotShotEvent::ViewChange(view) => {
@@ -581,7 +655,7 @@ impl<
let serialized_message = match upgrade_lock.serialize(&message).await {
Ok(serialized) => serialized,
Err(e) => {
- error!("Failed to serialize message: {}", e);
+ tracing::error!("Failed to serialize message: {}", e);
return;
}
};
@@ -606,7 +680,7 @@ impl<
match transmit_result {
Ok(()) => {}
- Err(e) => warn!("Failed to send message task: {:?}", e),
+ Err(e) => tracing::warn!("Failed to send message task: {:?}", e),
}
});
}
diff --git a/crates/task-impls/src/quorum_proposal/handlers.rs b/crates/task-impls/src/quorum_proposal/handlers.rs
index bcdc568e18..6ee2c2579e 100644
--- a/crates/task-impls/src/quorum_proposal/handlers.rs
+++ b/crates/task-impls/src/quorum_proposal/handlers.rs
@@ -9,7 +9,6 @@
use std::{marker::PhantomData, sync::Arc, time::Duration};
-use anyhow::{ensure, Context, Result};
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
@@ -26,7 +25,8 @@ use hotshot_types::{
block_contents::BlockHeader, node_implementation::NodeType, signature_key::SignatureKey,
},
};
-use tracing::{debug, error, instrument};
+use tracing::instrument;
+use utils::anytrace::*;
use vbs::version::StaticVersionType;
use crate::{
@@ -182,7 +182,8 @@ impl ProposalDependencyHandle {
version,
)
.await
- .context("Failed to construct legacy block header")?
+ .wrap()
+ .context(warn!("Failed to construct legacy block header"))?
} else {
TYPES::BlockHeader::new_marketplace(
state.as_ref(),
@@ -197,7 +198,8 @@ impl ProposalDependencyHandle {
version,
)
.await
- .context("Failed to construct marketplace block header")?
+ .wrap()
+ .context(warn!("Failed to construct marketplace block header"))?
};
let proposal = QuorumProposal {
@@ -218,14 +220,15 @@ impl ProposalDependencyHandle {
&self.private_key,
proposed_leaf.commit(&self.upgrade_lock).await.as_ref(),
)
- .context("Failed to compute proposed_leaf.commit()")?;
+ .wrap()
+ .context(error!("Failed to compute proposed_leaf.commit()"))?;
let message = Proposal {
data: proposal,
signature,
_pd: PhantomData,
};
- debug!(
+ tracing::debug!(
"Sending proposal for view {:?}",
proposed_leaf.view_number(),
);
@@ -335,14 +338,14 @@ impl HandleDepOutput for ProposalDependencyHandle<
}
if commit_and_metadata.is_none() {
- error!(
+ tracing::error!(
"Somehow completed the proposal dependency task without a commitment and metadata"
);
return;
}
if vid_share.is_none() {
- error!("Somehow completed the proposal dependency task without a VID share");
+ tracing::error!("Somehow completed the proposal dependency task without a VID share");
return;
}
@@ -362,7 +365,7 @@ impl HandleDepOutput for ProposalDependencyHandle<
)
.await
{
- error!("Failed to publish proposal; error = {e:#}");
+ tracing::error!("Failed to publish proposal; error = {e:#}");
}
}
}
diff --git a/crates/task-impls/src/quorum_proposal/mod.rs b/crates/task-impls/src/quorum_proposal/mod.rs
index 7390427f09..022b608698 100644
--- a/crates/task-impls/src/quorum_proposal/mod.rs
+++ b/crates/task-impls/src/quorum_proposal/mod.rs
@@ -6,7 +6,6 @@
use std::{collections::HashMap, sync::Arc};
-use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
@@ -33,7 +32,8 @@ use hotshot_types::{
};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
-use tracing::{debug, instrument, warn};
+use tracing::instrument;
+use utils::anytrace::*;
use self::handlers::{ProposalDependency, ProposalDependencyHandle};
use crate::{
@@ -171,7 +171,9 @@ impl, V: Versions>
};
let valid = event_view == view_number;
if valid {
- debug!("Dependency {dependency_type:?} is complete for view {event_view:?}!",);
+ tracing::debug!(
+ "Dependency {dependency_type:?} is complete for view {event_view:?}!",
+ );
}
valid
}),
@@ -288,24 +290,27 @@ impl, V: Versions>
event_receiver: Receiver>>,
event_sender: Sender>>,
event: Arc>,
- ) {
+ ) -> Result<()> {
// Don't even bother making the task if we are not entitled to propose anyway.
- if self.quorum_membership.leader(view_number, epoch_number) != self.public_key {
- tracing::trace!("We are not the leader of the next view");
- return;
- }
+ ensure!(
+ self.quorum_membership.leader(view_number, epoch_number)? == self.public_key,
+ debug!("We are not the leader of the next view")
+ );
// Don't try to propose twice for the same view.
- if view_number <= self.latest_proposed_view {
- tracing::trace!("We have already proposed for this view");
- return;
- }
+ ensure!(
+ view_number > self.latest_proposed_view,
+ "We have already proposed for this view"
+ );
- debug!("Attempting to make dependency task for view {view_number:?} and event {event:?}");
- if self.proposal_dependencies.contains_key(&view_number) {
- debug!("Task already exists");
- return;
- }
+ tracing::debug!(
+ "Attempting to make dependency task for view {view_number:?} and event {event:?}"
+ );
+
+ ensure!(
+ !self.proposal_dependencies.contains_key(&view_number),
+ "Task already exists"
+ );
let dependency_chain =
self.create_and_complete_dependencies(view_number, &event_receiver, event);
@@ -330,15 +335,18 @@ impl, V: Versions>
);
self.proposal_dependencies
.insert(view_number, dependency_task.run());
+
+ Ok(())
}
/// Update the latest proposed view number.
#[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Update latest proposed view", level = "error")]
async fn update_latest_proposed_view(&mut self, new_view: TYPES::View) -> bool {
if *self.latest_proposed_view < *new_view {
- debug!(
+ tracing::debug!(
"Updating latest proposed view from {} to {}",
- *self.latest_proposed_view, *new_view
+ *self.latest_proposed_view,
+ *new_view
);
// Cancel the old dependency tasks.
@@ -363,17 +371,17 @@ impl, V: Versions>
event: Arc>,
event_receiver: Receiver>>,
event_sender: Sender>>,
- ) {
+ ) -> Result<()> {
match event.as_ref() {
HotShotEvent::UpgradeCertificateFormed(cert) => {
- debug!(
+ tracing::debug!(
"Upgrade certificate received for view {}!",
*cert.view_number
);
// Update our current upgrade_cert as long as we still have a chance of reaching a decide on it in time.
if cert.data.decide_by >= self.latest_proposed_view + 3 {
- debug!("Updating current formed_upgrade_certificate");
+ tracing::debug!("Updating current formed_upgrade_certificate");
self.formed_upgrade_certificate = Some(cert.clone());
}
@@ -389,7 +397,7 @@ impl, V: Versions>
event_receiver,
event_sender,
Arc::clone(&event),
- );
+ )?;
}
either::Left(qc) => {
// Only update if the qc is from a newer view
@@ -422,24 +430,24 @@ impl, V: Versions>
event_receiver,
event_sender,
Arc::clone(&event),
- );
+ )?;
}
HotShotEvent::ViewSyncFinalizeCertificate2Recv(certificate) => {
let epoch_number = self.consensus.read().await.cur_epoch();
- if !certificate
- .is_valid_cert(
- self.quorum_membership.as_ref(),
- epoch_number,
- &self.upgrade_lock,
- )
- .await
- {
+
+ ensure!(
+ certificate
+ .is_valid_cert(
+ self.quorum_membership.as_ref(),
+ epoch_number,
+ &self.upgrade_lock
+ )
+ .await,
warn!(
"View Sync Finalize certificate {:?} was invalid",
certificate.data()
- );
- return;
- }
+ )
+ );
let view_number = certificate.view_number;
@@ -449,7 +457,7 @@ impl, V: Versions>
event_receiver,
event_sender,
event,
- );
+ )?;
}
HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
let view_number = proposal.data.view_number();
@@ -466,14 +474,15 @@ impl, V: Versions>
event_receiver,
event_sender,
Arc::clone(&event),
- );
+ )?;
}
HotShotEvent::QuorumProposalSend(proposal, _) => {
let view = proposal.data.view_number();
- if !self.update_latest_proposed_view(view).await {
- tracing::trace!("Failed to update latest proposed view");
- return;
- }
+
+ ensure!(
+ self.update_latest_proposed_view(view).await,
+ "Failed to update latest proposed view"
+ );
}
HotShotEvent::VidDisperseSend(vid_share, _) => {
let view_number = vid_share.data.view_number();
@@ -485,17 +494,27 @@ impl, V: Versions>
event_receiver,
event_sender,
Arc::clone(&event),
- );
+ )?;
}
HotShotEvent::UpdateHighQc(qc) => {
- // First, update the high QC.
- if let Err(e) = self.consensus.write().await.update_high_qc(qc.clone()) {
- tracing::trace!("Failed to update high qc; error = {e}");
- }
-
- if let Err(e) = self.storage.write().await.update_high_qc(qc.clone()).await {
- warn!("Failed to store High QC of QC we formed; error = {:?}", e);
- }
+ // First update the high QC internally
+ self.consensus
+ .write()
+ .await
+ .update_high_qc(qc.clone())
+ .wrap()
+ .context(error!(
+ "Failed to update high QC in internal consensus state!"
+ ))?;
+
+ // Then update the high QC in storage
+ self.storage
+ .write()
+ .await
+ .update_high_qc(qc.clone())
+ .await
+ .wrap()
+ .context(error!("Failed to update high QC in storage!"))?;
broadcast_event(
HotShotEvent::HighQcUpdated(qc.clone()).into(),
@@ -512,10 +531,11 @@ impl, V: Versions>
event_receiver,
event_sender,
Arc::clone(&event),
- );
+ )?;
}
_ => {}
}
+ Ok(())
}
}
@@ -531,9 +551,7 @@ impl, V: Versions> TaskState
sender: &Sender>,
receiver: &Receiver>,
) -> Result<()> {
- self.handle(event, receiver.clone(), sender.clone()).await;
-
- Ok(())
+ self.handle(event, receiver.clone(), sender.clone()).await
}
async fn cancel_subtasks(&mut self) {
diff --git a/crates/task-impls/src/quorum_proposal_recv/handlers.rs b/crates/task-impls/src/quorum_proposal_recv/handlers.rs
index 2193bf3dfd..3d1957c84a 100644
--- a/crates/task-impls/src/quorum_proposal_recv/handlers.rs
+++ b/crates/task-impls/src/quorum_proposal_recv/handlers.rs
@@ -8,7 +8,6 @@
use std::sync::Arc;
-use anyhow::{bail, Context, Result};
use async_broadcast::{broadcast, Receiver, Sender};
use async_lock::RwLockUpgradableReadGuard;
use committable::Committable;
@@ -26,7 +25,8 @@ use hotshot_types::{
utils::{View, ViewInner},
vote::{Certificate, HasViewNumber},
};
-use tracing::{debug, error, instrument, warn};
+use tracing::instrument;
+use utils::anytrace::*;
use super::QuorumProposalRecvTaskState;
use crate::{
@@ -78,7 +78,7 @@ async fn validate_proposal_liveness(view_number, event_sender, task_state).await {
- debug!("Liveness Branch - Failed to update view; error = {e:#}");
+ tracing::debug!("Liveness Branch - Failed to update view; error = {e:#}");
}
if !liveness_check {
@@ -128,7 +128,7 @@ pub(crate) async fn handle_quorum_proposal_recv<
validate_proposal_view_and_certs(proposal, task_state)
.await
- .context("Failed to validate proposal view or attached certs")?;
+ .context(warn!("Failed to validate proposal view or attached certs"))?;
let view_number = proposal.data.view_number();
let justify_qc = proposal.data.justify_qc.clone();
@@ -220,7 +220,7 @@ pub(crate) async fn handle_quorum_proposal_recv<
.await;
let Some((parent_leaf, _parent_state)) = parent else {
- warn!(
+ tracing::warn!(
"Proposal's parent missing from storage with commitment: {:?}",
justify_qc.data.leaf_commit
);
@@ -239,7 +239,7 @@ pub(crate) async fn handle_quorum_proposal_recv<
// NOTE: We could update our view with a valid TC but invalid QC, but that is not what we do here
if let Err(e) = update_view::(view_number, event_sender, task_state).await {
- debug!("Full Branch - Failed to update view; error = {e:#}");
+ tracing::debug!("Full Branch - Failed to update view; error = {e:#}");
}
Ok(())
diff --git a/crates/task-impls/src/quorum_proposal_recv/mod.rs b/crates/task-impls/src/quorum_proposal_recv/mod.rs
index 281e472fd0..7c3ab2ed24 100644
--- a/crates/task-impls/src/quorum_proposal_recv/mod.rs
+++ b/crates/task-impls/src/quorum_proposal_recv/mod.rs
@@ -8,7 +8,6 @@
use std::{collections::BTreeMap, sync::Arc};
-use anyhow::{bail, Result};
use async_broadcast::{broadcast, Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
@@ -31,6 +30,7 @@ use hotshot_types::{
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
+use utils::anytrace::{bail, Result};
use vbs::version::Version;
use self::handlers::handle_quorum_proposal_recv;
diff --git a/crates/task-impls/src/quorum_vote/handlers.rs b/crates/task-impls/src/quorum_vote/handlers.rs
index d414573e19..656737524f 100644
--- a/crates/task-impls/src/quorum_vote/handlers.rs
+++ b/crates/task-impls/src/quorum_vote/handlers.rs
@@ -6,7 +6,6 @@
use std::sync::Arc;
-use anyhow::Result;
use async_broadcast::Sender;
use chrono::Utc;
use hotshot_types::{
@@ -19,7 +18,8 @@ use hotshot_types::{
},
vote::HasViewNumber,
};
-use tracing::{debug, instrument};
+use tracing::instrument;
+use utils::anytrace::*;
use super::QuorumVoteTaskState;
use crate::{
@@ -115,7 +115,7 @@ pub(crate) async fn handle_quorum_proposal_validated<
.number_of_views_per_decide_event
.add_point(cur_number_of_views_per_decide_event as f64);
- debug!(
+ tracing::debug!(
"Sending Decide for view {:?}",
consensus_writer.last_decided_view()
);
@@ -139,7 +139,7 @@ pub(crate) async fn handle_quorum_proposal_validated<
.await;
broadcast_event(Arc::new(HotShotEvent::LeafDecided(leaves_decided)), sender).await;
- debug!("Successfully sent decide event");
+ tracing::debug!("Successfully sent decide event");
}
Ok(())
diff --git a/crates/task-impls/src/quorum_vote/mod.rs b/crates/task-impls/src/quorum_vote/mod.rs
index 7d0fa4833b..9285e627ab 100644
--- a/crates/task-impls/src/quorum_vote/mod.rs
+++ b/crates/task-impls/src/quorum_vote/mod.rs
@@ -6,7 +6,6 @@
use std::{collections::BTreeMap, sync::Arc};
-use anyhow::{bail, ensure, Context, Result};
use async_broadcast::{InactiveReceiver, Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
@@ -38,7 +37,8 @@ use hotshot_types::{
use jf_vid::VidScheme;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
-use tracing::{debug, error, instrument, trace, warn};
+use tracing::instrument;
+use utils::anytrace::*;
use crate::{
events::HotShotEvent,
@@ -123,7 +123,7 @@ impl + 'static, V: Versions>
.await
.ok(),
};
- let parent = maybe_parent.context(format!(
+ let parent = maybe_parent.context(info!(
"Proposal's parent missing from storage with commitment: {:?}, proposal view {:?}",
justify_qc.data().leaf_commit,
proposed_leaf.view_number(),
@@ -131,7 +131,7 @@ impl + 'static, V: Versions>
let consensus_reader = self.consensus.read().await;
let (Some(parent_state), _) = consensus_reader.state_and_delta(parent.view_number()) else {
- bail!("Parent state not found! Consensus internally inconsistent")
+ bail!("Parent state not found! Consensus internally inconsistent");
};
drop(consensus_reader);
@@ -147,7 +147,8 @@ impl + 'static, V: Versions>
version,
)
.await
- .context("Block header doesn't extend the proposal!")?;
+ .wrap()
+ .context(warn!("Block header doesn't extend the proposal!"))?;
let state = Arc::new(validated_state);
let delta = Arc::new(state_delta);
@@ -189,7 +190,9 @@ impl + 'static, V: Versions>
.write()
.await
.update_undecided_state(new_leaves, new_state)
- .await?;
+ .await
+ .wrap()
+ .context(error!("Failed to update undecided state"))?;
Ok(())
}
@@ -204,10 +207,10 @@ impl + 'static, V: Versions>
ensure!(
self.quorum_membership
.has_stake(&self.public_key, self.epoch_number),
- format!(
+ info!(
"We were not chosen for quorum committee on {:?}",
self.view_number
- ),
+ )
);
// Create and send the vote.
@@ -221,8 +224,9 @@ impl + 'static, V: Versions>
&self.upgrade_lock,
)
.await
- .context("Failed to sign vote")?;
- debug!(
+ .wrap()
+ .context(error!("Failed to sign vote. This should never happen."))?;
+ tracing::debug!(
"sending vote to next quorum leader {:?}",
vote.view_number() + 1
);
@@ -232,7 +236,8 @@ impl + 'static, V: Versions>
.await
.append_vid(&vid_share)
.await
- .context("Failed to store VID share")?;
+ .wrap()
+ .context(error!("Failed to store VID share"))?;
broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &self.sender).await;
Ok(())
@@ -283,7 +288,7 @@ impl + 'static, V: Versions> Handl
let proposal_payload_comm = proposal.block_header.payload_commitment();
if let Some(comm) = payload_commitment {
if proposal_payload_comm != comm {
- error!("Quorum proposal has inconsistent payload commitment with DAC or VID.");
+ tracing::error!("Quorum proposal has inconsistent payload commitment with DAC or VID.");
return;
}
} else {
@@ -292,7 +297,7 @@ impl + 'static, V: Versions> Handl
let parent_commitment = parent_leaf.commit(&self.upgrade_lock).await;
let proposed_leaf = Leaf::from_quorum_proposal(proposal);
if proposed_leaf.parent_commitment() != parent_commitment {
- warn!("Proposed leaf parent commitment does not match parent leaf payload commitment. Aborting vote.");
+ tracing::warn!("Proposed leaf parent commitment does not match parent leaf payload commitment. Aborting vote.");
return;
}
leaf = Some(proposed_leaf);
@@ -301,7 +306,7 @@ impl + 'static, V: Versions> Handl
let cert_payload_comm = cert.data().payload_commit;
if let Some(comm) = payload_commitment {
if cert_payload_comm != comm {
- error!("DAC has inconsistent payload commitment with quorum proposal or VID.");
+ tracing::error!("DAC has inconsistent payload commitment with quorum proposal or VID.");
return;
}
} else {
@@ -313,7 +318,7 @@ impl + 'static, V: Versions> Handl
vid_share = Some(share.clone());
if let Some(comm) = payload_commitment {
if vid_payload_commitment != comm {
- error!("VID has inconsistent payload commitment with quorum proposal or DAC.");
+ tracing::error!("VID has inconsistent payload commitment with quorum proposal or DAC.");
return;
}
} else {
@@ -332,7 +337,7 @@ impl + 'static, V: Versions> Handl
.await;
let Some(vid_share) = vid_share else {
- error!(
+ tracing::error!(
"We don't have the VID share for this view {:?}, but we should, because the vote dependencies have completed.",
self.view_number
);
@@ -340,7 +345,7 @@ impl + 'static, V: Versions> Handl
};
let Some(leaf) = leaf else {
- error!(
+ tracing::error!(
"We don't have the leaf for this view {:?}, but we should, because the vote dependencies have completed.",
self.view_number
);
@@ -349,12 +354,12 @@ impl + 'static, V: Versions> Handl
// Update internal state
if let Err(e) = self.update_shared_state(&leaf, &vid_share).await {
- error!("Failed to update shared consensus state; error = {e:#}");
+ tracing::error!("Failed to update shared consensus state; error = {e:#}");
return;
}
if let Err(e) = self.submit_vote(leaf, vid_share).await {
- debug!("Failed to vote; error = {e:#}");
+ tracing::debug!("Failed to vote; error = {e:#}");
}
}
}
@@ -440,7 +445,7 @@ impl, V: Versions> QuorumVoteTaskS
}
};
if event_view == view_number {
- trace!("Vote dependency {:?} completed", dependency_type);
+ tracing::trace!("Vote dependency {:?} completed", dependency_type);
return true;
}
false
@@ -512,16 +517,17 @@ impl, V: Versions> QuorumVoteTaskS
#[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
async fn update_latest_voted_view(&mut self, new_view: TYPES::View) -> bool {
if *self.latest_voted_view < *new_view {
- debug!(
+ tracing::debug!(
"Updating next vote view from {} to {} in the quorum vote task",
- *self.latest_voted_view, *new_view
+ *self.latest_voted_view,
+ *new_view
);
// Cancel the old dependency tasks.
for view in *self.latest_voted_view..(*new_view) {
if let Some(dependency) = self.vote_dependencies.remove(&TYPES::View::new(view)) {
cancel_task(dependency).await;
- debug!("Vote dependency removed for view {:?}", view);
+ tracing::debug!("Vote dependency removed for view {:?}", view);
}
}
@@ -539,17 +545,20 @@ impl, V: Versions> QuorumVoteTaskS
event: Arc>,
event_receiver: Receiver>>,
event_sender: Sender>>,
- ) {
+ ) -> Result<()> {
let current_epoch = self.consensus.read().await.cur_epoch();
+
match event.as_ref() {
HotShotEvent::QuorumProposalValidated(proposal, _leaf) => {
- trace!("Received Proposal for view {}", *proposal.view_number());
+ tracing::trace!("Received Proposal for view {}", *proposal.view_number());
// Handle the event before creating the dependency task.
if let Err(e) =
handle_quorum_proposal_validated(proposal, &event_sender, self).await
{
- debug!("Failed to handle QuorumProposalValidated event; error = {e:#}");
+ tracing::debug!(
+ "Failed to handle QuorumProposalValidated event; error = {e:#}"
+ );
}
self.create_dependency_task_if_new(
@@ -562,23 +571,25 @@ impl, V: Versions> QuorumVoteTaskS
}
HotShotEvent::DaCertificateRecv(cert) => {
let view = cert.view_number;
- trace!("Received DAC for view {}", *view);
- if view <= self.latest_voted_view {
- return;
- }
+
+ tracing::trace!("Received DAC for view {}", *view);
+ // Do nothing if the DAC is old
+ ensure!(
+ view > self.latest_voted_view,
+ "Received DAC for an older view."
+ );
let current_epoch = self.consensus.read().await.cur_epoch();
// Validate the DAC.
- if !cert
- .is_valid_cert(
+ ensure!(
+ cert.is_valid_cert(
self.da_membership.as_ref(),
current_epoch,
- &self.upgrade_lock,
+ &self.upgrade_lock
)
- .await
- {
- return;
- }
+ .await,
+ warn!("Invalid DAC")
+ );
// Add to the storage.
self.consensus
@@ -601,51 +612,43 @@ impl, V: Versions> QuorumVoteTaskS
}
HotShotEvent::VidShareRecv(sender, disperse) => {
let view = disperse.data.view_number();
- trace!("Received VID share for view {}", *view);
- if view <= self.latest_voted_view {
- return;
- }
+ // Do nothing if the VID share is old
+ tracing::trace!("Received VID share for view {}", *view);
+ ensure!(
+ view > self.latest_voted_view,
+ "Received VID share for an older view."
+ );
// Validate the VID share.
let payload_commitment = disperse.data.payload_commitment;
let current_epoch = self.consensus.read().await.cur_epoch();
- // Check sender of VID disperse share is signed by DA committee member
- let validate_sender = sender
- .validate(&disperse.signature, payload_commitment.as_ref())
- && self
- .da_membership
+
+ // Check that the signature is valid
+ ensure!(
+ sender.validate(&disperse.signature, payload_commitment.as_ref()),
+ "VID share signature is invalid"
+ );
+
+ // ensure that the VID share was sent by a DA member OR the view leader
+ ensure!(
+ self.da_membership
.committee_members(view, current_epoch)
- .contains(sender);
-
- // Check whether the data satisfies one of the following.
- // * From the right leader for this view.
- // * Calculated and signed by the current node.
- let validated = self
- .public_key
- .validate(&disperse.signature, payload_commitment.as_ref())
- || self
- .quorum_membership
- .leader(view, current_epoch)
- .validate(&disperse.signature, payload_commitment.as_ref());
- if !validate_sender && !validated {
- warn!("Failed to validated the VID dispersal/share sig.");
- return;
- }
+ .contains(sender)
+ || *sender == self.quorum_membership.leader(view, current_epoch)?,
+ "VID share was not sent by a DA member or the view leader."
+ );
// NOTE: `verify_share` returns a nested `Result`, so we must check both the inner
// and outer results
- #[allow(clippy::no_effect)]
match vid_scheme(self.quorum_membership.total_nodes(current_epoch)).verify_share(
&disperse.data.share,
&disperse.data.common,
&payload_commitment,
) {
Ok(Err(())) | Err(_) => {
- return;
- }
- Ok(Ok(())) => {
- ();
+ bail!("Failed to verify VID share");
}
+ Ok(Ok(())) => {}
}
self.consensus
@@ -653,10 +656,10 @@ impl, V: Versions> QuorumVoteTaskS
.await
.update_vid_shares(view, disperse.clone());
- if disperse.data.recipient_key != self.public_key {
- debug!("Got a Valid VID share but it's not for our key");
- return;
- }
+ ensure!(
+ disperse.data.recipient_key == self.public_key,
+ "Got a Valid VID share but it's not for our key"
+ );
broadcast_event(
Arc::new(HotShotEvent::VidShareValidated(disperse.clone())),
@@ -672,10 +675,9 @@ impl, V: Versions> QuorumVoteTaskS
);
}
HotShotEvent::QuorumVoteDependenciesValidated(view_number) => {
- debug!("All vote dependencies verified for view {:?}", view_number);
+ tracing::debug!("All vote dependencies verified for view {:?}", view_number);
if !self.update_latest_voted_view(*view_number).await {
- debug!("view not updated");
- return;
+ tracing::debug!("view not updated");
}
}
HotShotEvent::Timeout(view) => {
@@ -697,6 +699,7 @@ impl, V: Versions> QuorumVoteTaskS
}
_ => {}
}
+ Ok(())
}
}
@@ -712,9 +715,7 @@ impl, V: Versions> TaskState
sender: &Sender>,
receiver: &Receiver>,
) -> Result<()> {
- self.handle(event, receiver.clone(), sender.clone()).await;
-
- Ok(())
+ self.handle(event, receiver.clone(), sender.clone()).await
}
async fn cancel_subtasks(&mut self) {
diff --git a/crates/task-impls/src/request.rs b/crates/task-impls/src/request.rs
index 8cd336e7b1..c7bc3c8957 100644
--- a/crates/task-impls/src/request.rs
+++ b/crates/task-impls/src/request.rs
@@ -13,7 +13,6 @@ use std::{
time::Duration,
};
-use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn, async_timeout};
#[cfg(async_executor_impl = "async-std")]
@@ -38,6 +37,7 @@ use sha2::{Digest, Sha256};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::instrument;
+use utils::anytrace::Result;
use crate::{events::HotShotEvent, helpers::broadcast_event};
diff --git a/crates/task-impls/src/rewind.rs b/crates/task-impls/src/rewind.rs
index 669b410b52..9ae424b62b 100644
--- a/crates/task-impls/src/rewind.rs
+++ b/crates/task-impls/src/rewind.rs
@@ -6,11 +6,11 @@
use std::{fs::OpenOptions, io::Write, sync::Arc};
-use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_trait::async_trait;
use hotshot_task::task::TaskState;
use hotshot_types::traits::node_implementation::NodeType;
+use utils::anytrace::Result;
use crate::events::HotShotEvent;
diff --git a/crates/task-impls/src/transactions.rs b/crates/task-impls/src/transactions.rs
index 354d9d1fd6..72bcd8f78b 100644
--- a/crates/task-impls/src/transactions.rs
+++ b/crates/task-impls/src/transactions.rs
@@ -9,7 +9,6 @@ use std::{
time::{Duration, Instant},
};
-use anyhow::{bail, ensure, Context, Result};
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_timeout};
use async_trait::async_trait;
@@ -32,8 +31,9 @@ use hotshot_types::{
utils::ViewInner,
vid::{VidCommitment, VidPrecomputeData},
};
-use tracing::{debug, error, info, instrument, warn};
+use tracing::instrument;
use url::Url;
+use utils::anytrace::*;
use vbs::version::{StaticVersionType, Version};
use vec1::Vec1;
@@ -149,7 +149,7 @@ impl, V: Versions> TransactionTask
let version = match self.upgrade_lock.version(block_view).await {
Ok(v) => v,
Err(err) => {
- error!("Upgrade certificate requires unsupported version, refusing to request blocks: {}", err);
+ tracing::error!("Upgrade certificate requires unsupported version, refusing to request blocks: {}", err);
return None;
}
};
@@ -191,7 +191,7 @@ impl, V: Versions> TransactionTask
.await;
} else {
// If we couldn't get a block, send an empty block
- info!(
+ tracing::info!(
"Failed to get a block for view {:?}, proposing empty block",
block_view
);
@@ -209,7 +209,7 @@ impl, V: Versions> TransactionTask
self.membership.total_nodes(self.cur_epoch),
version,
) else {
- error!("Failed to get null fee");
+ tracing::error!("Failed to get null fee");
return None;
};
@@ -254,13 +254,14 @@ impl, V: Versions> TransactionTask
.await
.as_ref()
.is_some_and(|cert| cert.upgrading_in(block_view)),
- "Not requesting block because we are upgrading",
+ info!("Not requesting block because we are upgrading")
);
let (parent_view, parent_hash) = self
.last_vid_commitment_retry(block_view, task_start_time)
.await
- .context("Failed to find parent hash in time")?;
+ .wrap()
+ .context(warn!("Failed to find parent hash in time"))?;
let start = Instant::now();
@@ -270,10 +271,11 @@ impl, V: Versions> TransactionTask
.fetch_auction_result(block_view),
)
.await
- .context("Timeout while getting auction result")?;
+ .wrap()
+ .context(warn!("Timeout while getting auction result"))?;
let auction_result = maybe_auction_result
- .map_err(|e| warn!("Failed to get auction results: {e:#}"))
+ .map_err(|e| tracing::warn!("Failed to get auction results: {e:#}"))
.unwrap_or_default(); // We continue here, as we still have fallback builder URL
let mut futures = Vec::new();
@@ -319,13 +321,16 @@ impl, V: Versions> TransactionTask
let validated_state = self.consensus.read().await.decided_state();
let sequencing_fees = Vec1::try_from_vec(sequencing_fees)
- .context("Failed to receive a bundle from any builder.")?;
+ .wrap()
+ .context(warn!("Failed to receive a bundle from any builder."))?;
let (block_payload, metadata) = TYPES::BlockPayload::from_transactions(
transactions,
&validated_state,
&Arc::clone(&self.instance_state),
)
- .await?;
+ .await
+ .wrap()
+ .context(error!("Failed to construct block payload"))?;
Ok(PackedBundle::new(
block_payload.encode(),
@@ -348,7 +353,7 @@ impl, V: Versions> TransactionTask
self.membership.total_nodes(self.cur_epoch),
version,
) else {
- error!("Failed to calculate null block fee.");
+ tracing::error!("Failed to calculate null block fee.");
return None;
};
@@ -379,7 +384,7 @@ impl, V: Versions> TransactionTask
let version = match self.upgrade_lock.version(block_view).await {
Ok(v) => v,
Err(err) => {
- error!("Upgrade certificate requires unsupported version, refusing to request blocks: {}", err);
+ tracing::error!("Upgrade certificate requires unsupported version, refusing to request blocks: {}", err);
return None;
}
};
@@ -425,7 +430,7 @@ impl, V: Versions> TransactionTask
&mut self,
event: Arc>,
event_stream: Sender>>,
- ) -> Option {
+ ) -> Result<()> {
match event.as_ref() {
HotShotEvent::TransactionsRecv(transactions) => {
broadcast_event(
@@ -438,30 +443,36 @@ impl, V: Versions> TransactionTask
&self.output_event_stream,
)
.await;
-
- return None;
}
HotShotEvent::ViewChange(view) => {
let view = *view;
- debug!("view change in transactions to view {:?}", view);
- if (*view != 0 || *self.cur_view > 0) && *self.cur_view >= *view {
- return None;
- }
+
+ tracing::debug!("view change in transactions to view {:?}", view);
+ ensure!(
+ *view > *self.cur_view || *self.cur_view == 0,
+ debug!(
+ "Received a view change to an older view: tried to change view to {:?} though we are at view {:?}", view, self.cur_view
+ )
+ );
let mut make_block = false;
if *view - *self.cur_view > 1 {
- info!("View changed by more than 1 going to view {:?}", view);
- make_block = self.membership.leader(view, self.cur_epoch) == self.public_key;
+ tracing::info!("View changed by more than 1 going to view {:?}", view);
+ make_block = self.membership.leader(view, self.cur_epoch)? == self.public_key;
}
self.cur_view = view;
let next_view = self.cur_view + 1;
let next_leader =
- self.membership.leader(next_view, self.cur_epoch) == self.public_key;
- if !make_block && !next_leader {
- debug!("Not next leader for view {:?}", self.cur_view);
- return None;
- }
+ self.membership.leader(next_view, self.cur_epoch)? == self.public_key;
+
+ ensure!(
+ make_block || next_leader,
+ debug!(
+ "Not making the block because we are not leader for view {:?}",
+ self.cur_view
+ )
+ );
if make_block {
self.handle_view_change(&event_stream, self.cur_view).await;
@@ -471,12 +482,9 @@ impl, V: Versions> TransactionTask
self.handle_view_change(&event_stream, next_view).await;
}
}
- HotShotEvent::Shutdown => {
- return Some(HotShotTaskCompleted);
- }
_ => {}
}
- None
+ Ok(())
}
/// Get VID commitment for the last successful view before `block_view`.
@@ -514,7 +522,9 @@ impl, V: Versions> TransactionTask
let view_data = consensus
.validated_state_map()
.get(&target_view)
- .context("Missing record for view {?target_view} in validated state")?;
+ .context(info!(
+ "Missing record for view {?target_view} in validated state"
+ ))?;
match view_data.view_inner {
ViewInner::Da { payload_commitment } => {
@@ -525,13 +535,13 @@ impl, V: Versions> TransactionTask
..
} => {
let leaf = consensus.saved_leaves().get(&leaf_commitment).context
- ("Missing leaf with commitment {leaf_commitment} for view {target_view} in saved_leaves")?;
+ (info!("Missing leaf with commitment {leaf_commitment} for view {target_view} in saved_leaves"))?;
return Ok((target_view, leaf.payload_commitment()));
}
ViewInner::Failed => {
// For failed views, backtrack
target_view =
- TYPES::View::new(target_view.checked_sub(1).context("Reached genesis")?);
+ TYPES::View::new(target_view.checked_sub(1).context(warn!("Reached genesis. Something is wrong -- have we not decided any blocks since genesis?"))?);
continue;
}
}
@@ -560,7 +570,7 @@ impl, V: Versions> TransactionTask
) {
Ok(sig) => sig,
Err(err) => {
- error!(%err, "Failed to sign block hash");
+ tracing::error!(%err, "Failed to sign block hash");
return None;
}
};
@@ -588,7 +598,7 @@ impl, V: Versions> TransactionTask
// We timed out while getting available blocks
Err(err) => {
- info!(%err, "Timeout while getting available blocks");
+ tracing::info!(%err, "Timeout while getting available blocks");
return None;
}
}
@@ -674,7 +684,7 @@ impl, V: Versions> TransactionTask
parent_comm: VidCommitment,
view_number: TYPES::View,
parent_comm_sig: &<::SignatureKey as SignatureKey>::PureAssembledSignatureType,
- ) -> anyhow::Result> {
+ ) -> Result> {
let mut available_blocks = self
.get_available_blocks(parent_comm, view_number, parent_comm_sig)
.await;
@@ -792,9 +802,7 @@ impl, V: Versions> TaskState
sender: &Sender>,
_receiver: &Receiver>,
) -> Result<()> {
- self.handle(event, sender.clone()).await;
-
- Ok(())
+ self.handle(event, sender.clone()).await
}
async fn cancel_subtasks(&mut self) {}
diff --git a/crates/task-impls/src/upgrade.rs b/crates/task-impls/src/upgrade.rs
index 4992b06887..fb4f4de7f4 100644
--- a/crates/task-impls/src/upgrade.rs
+++ b/crates/task-impls/src/upgrade.rs
@@ -6,7 +6,6 @@
use std::{marker::PhantomData, sync::Arc, time::SystemTime};
-use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_trait::async_trait;
use committable::Committable;
@@ -28,11 +27,12 @@ use hotshot_types::{
},
vote::HasViewNumber,
};
-use tracing::{debug, error, info, instrument, warn};
+use tracing::instrument;
+use utils::anytrace::*;
use vbs::version::StaticVersionType;
use crate::{
- events::{HotShotEvent, HotShotTaskCompleted},
+ events::HotShotEvent,
helpers::broadcast_event,
vote_collection::{handle_vote, VoteCollectorsMap},
};
@@ -109,45 +109,47 @@ impl, V: Versions> UpgradeTaskStat
&mut self,
event: Arc>,
tx: Sender>>,
- ) -> Option {
+ ) -> Result<()> {
match event.as_ref() {
HotShotEvent::UpgradeProposalRecv(proposal, sender) => {
- info!("Received upgrade proposal: {:?}", proposal);
+ tracing::info!("Received upgrade proposal: {:?}", proposal);
let view = *proposal.data.view_number();
// Skip voting if the version has already been upgraded.
- if self.upgraded().await {
- info!(
- "Already upgraded to {:?}, skip voting.",
- V::Upgrade::VERSION
- );
- return None;
- }
+ ensure!(
+ !self.upgraded().await,
+ info!("Already upgraded to {:?}; not voting.", V::Upgrade::VERSION)
+ );
let time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
- .ok()?
+ .wrap()
+ .context(error!(
+ "Failed to calculate duration. This should never happen."
+ ))?
.as_secs();
- if time < self.start_voting_time || time >= self.stop_voting_time {
- return None;
- }
+ ensure!(
+ time >= self.start_voting_time && time < self.stop_voting_time,
+ "Refusing to vote because we are no longer in the configured vote time window."
+ );
- if view < self.start_voting_view || view >= self.stop_voting_view {
- return None;
- }
+ ensure!(
+ view >= self.start_voting_view && view < self.stop_voting_view,
+ "Refusing to vote because we are no longer in the configured vote view window."
+ );
// If the proposal does not match our upgrade target, we immediately exit.
- if proposal.data.upgrade_proposal.new_version_hash != V::UPGRADE_HASH
- || proposal.data.upgrade_proposal.old_version != V::Base::VERSION
- || proposal.data.upgrade_proposal.new_version != V::Upgrade::VERSION
- {
- return None;
- }
+ ensure!(
+ proposal.data.upgrade_proposal.new_version_hash == V::UPGRADE_HASH
+ && proposal.data.upgrade_proposal.old_version == V::Base::VERSION
+ && proposal.data.upgrade_proposal.new_version == V::Upgrade::VERSION,
+ "Proposal does not match our upgrade target"
+ );
// If we have an upgrade target, we validate that the proposal is relevant for the current view.
- info!(
+ tracing::info!(
"Upgrade proposal received for view: {:?}",
proposal.data.view_number()
);
@@ -169,20 +171,23 @@ impl, V: Versions> UpgradeTaskStat
// the `UpgradeProposalRecv` event. Otherwise, the view number subtraction below will
// cause an overflow error.
// TODO Come back to this - we probably don't need this, but we should also never receive a UpgradeCertificate where this fails, investigate block ready so it doesn't make one for the genesis block
- if self.cur_view != TYPES::View::genesis() && view < self.cur_view - 1 {
- warn!("Discarding old upgrade proposal; the proposal is for view {:?}, but the current view is {:?}.",
+ ensure!(
+ self.cur_view != TYPES::View::genesis() && *view >= self.cur_view.saturating_sub(1),
+ warn!(
+ "Discarding old upgrade proposal; the proposal is for view {:?}, but the current view is {:?}.",
view,
self.cur_view
- );
- return None;
- }
+ )
+ );
// We then validate that the proposal was issued by the leader for the view.
- let view_leader_key = self.quorum_membership.leader(view, self.cur_epoch);
- if &view_leader_key != sender {
- error!("Upgrade proposal doesn't have expected leader key for view {} \n Upgrade proposal is: {:?}", *view, proposal.data.clone());
- return None;
- }
+ let view_leader_key = self.quorum_membership.leader(view, self.cur_epoch)?;
+ ensure!(
+ view_leader_key == *sender,
+ info!(
+ "Upgrade proposal doesn't have expected leader key for view {} \n Upgrade proposal is: {:?}", *view, proposal.data.clone()
+ )
+ );
// At this point, we've checked that:
// * the proposal was expected,
@@ -201,36 +206,33 @@ impl, V: Versions> UpgradeTaskStat
.await;
// If everything is fine up to here, we generate and send a vote on the proposal.
- let Ok(vote) = UpgradeVote::create_signed_vote(
+ let vote = UpgradeVote::create_signed_vote(
proposal.data.upgrade_proposal.clone(),
view,
&self.public_key,
&self.private_key,
&self.upgrade_lock,
)
- .await
- else {
- error!("Failed to sign UpgradeVote!");
- return None;
- };
- debug!("Sending upgrade vote {:?}", vote.view_number());
+ .await?;
+
+ tracing::debug!("Sending upgrade vote {:?}", vote.view_number());
broadcast_event(Arc::new(HotShotEvent::UpgradeVoteSend(vote)), &tx).await;
}
HotShotEvent::UpgradeVoteRecv(ref vote) => {
- debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
+ tracing::debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
// Check if we are the leader.
{
let view = vote.view_number();
- if self.quorum_membership.leader(view, self.cur_epoch) != self.public_key {
- error!(
+ ensure!(
+ self.quorum_membership.leader(view, self.cur_epoch)? == self.public_key,
+ debug!(
"We are not the leader for view {} are we leader for next view? {}",
*view,
- self.quorum_membership.leader(view + 1, self.cur_epoch)
+ self.quorum_membership.leader(view + 1, self.cur_epoch)?
== self.public_key
- );
- return None;
- }
+ )
+ );
}
handle_vote(
@@ -244,19 +246,20 @@ impl, V: Versions> UpgradeTaskStat
&tx,
&self.upgrade_lock,
)
- .await;
+ .await?;
}
HotShotEvent::ViewChange(new_view) => {
- if self.cur_view >= *new_view {
- return None;
- }
+ ensure!(self.cur_view < *new_view || *self.cur_view == 0);
self.cur_view = *new_view;
let view: u64 = *self.cur_view;
let time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
- .ok()?
+ .wrap()
+ .context(error!(
+ "Failed to calculate duration. This should never happen."
+ ))?
.as_secs();
// We try to form a certificate 5 views before we're leader.
@@ -268,7 +271,7 @@ impl, V: Versions> UpgradeTaskStat
&& self.quorum_membership.leader(
TYPES::View::new(view + UPGRADE_PROPOSE_OFFSET),
self.cur_epoch,
- ) == self.public_key
+ )? == self.public_key
{
let upgrade_proposal_data = UpgradeProposalData {
old_version: V::Base::VERSION,
@@ -290,7 +293,7 @@ impl, V: Versions> UpgradeTaskStat
)
.expect("Failed to sign upgrade proposal commitment!");
- warn!("Sending upgrade proposal:\n\n {:?}", upgrade_proposal);
+ tracing::warn!("Sending upgrade proposal:\n\n {:?}", upgrade_proposal);
let message = Proposal {
data: upgrade_proposal,
@@ -307,16 +310,10 @@ impl, V: Versions> UpgradeTaskStat
)
.await;
}
-
- return None;
- }
- HotShotEvent::Shutdown => {
- error!("Shutting down because of shutdown signal!");
- return Some(HotShotTaskCompleted);
}
_ => {}
}
- None
+ Ok(())
}
}
@@ -333,7 +330,7 @@ impl, V: Versions> TaskState
sender: &Sender>,
_receiver: &Receiver>,
) -> Result<()> {
- self.handle(event, sender.clone()).await;
+ self.handle(event, sender.clone()).await?;
Ok(())
}
diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs
index 106203bd07..30d31dff7d 100644
--- a/crates/task-impls/src/vid.rs
+++ b/crates/task-impls/src/vid.rs
@@ -6,7 +6,6 @@
use std::{marker::PhantomData, sync::Arc};
-use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_trait::async_trait;
use hotshot_task::task::TaskState;
@@ -21,6 +20,7 @@ use hotshot_types::{
},
};
use tracing::{debug, error, info, instrument};
+use utils::anytrace::Result;
use crate::{
events::{HotShotEvent, HotShotTaskCompleted},
diff --git a/crates/task-impls/src/view_sync.rs b/crates/task-impls/src/view_sync.rs
index fb00054f88..46e4b11c12 100644
--- a/crates/task-impls/src/view_sync.rs
+++ b/crates/task-impls/src/view_sync.rs
@@ -4,7 +4,6 @@
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see .
-#![allow(clippy::module_name_repetitions)]
use std::{
collections::{BTreeMap, HashMap},
fmt::Debug,
@@ -12,7 +11,6 @@ use std::{
time::Duration,
};
-use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
@@ -38,7 +36,8 @@ use hotshot_types::{
};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
-use tracing::{debug, error, info, instrument, warn};
+use tracing::instrument;
+use utils::anytrace::*;
use crate::{
events::{HotShotEvent, HotShotTaskCompleted},
@@ -125,9 +124,7 @@ impl, V: Versions> TaskState
sender: &Sender>,
_receiver: &Receiver>,
) -> Result<()> {
- self.handle(event, sender.clone()).await;
-
- Ok(())
+ self.handle(event, sender.clone()).await
}
async fn cancel_subtasks(&mut self) {}
@@ -199,7 +196,7 @@ impl, V: Versions> ViewSyncTaskSta
// This certificate is old, we can throw it away
// If next view = cert round, then that means we should already have a task running for it
if self.current_view > view {
- debug!("Already in a higher view than the view sync message");
+ tracing::debug!("Already in a higher view than the view sync message");
return;
}
@@ -207,7 +204,7 @@ impl, V: Versions> ViewSyncTaskSta
if let Some(replica_task) = task_map.get_mut(&view) {
// Forward event then return
- debug!("Forwarding message");
+ tracing::debug!("Forwarding message");
let result = replica_task
.handle(Arc::clone(&event), sender.clone())
.await;
@@ -258,28 +255,28 @@ impl, V: Versions> ViewSyncTaskSta
&mut self,
event: Arc>,
event_stream: Sender>>,
- ) {
+ ) -> Result<()> {
match event.as_ref() {
HotShotEvent::ViewSyncPreCommitCertificate2Recv(certificate) => {
- debug!("Received view sync cert for phase {:?}", certificate);
+ tracing::debug!("Received view sync cert for phase {:?}", certificate);
let view = certificate.view_number();
self.send_to_or_create_replica(event, view, &event_stream)
.await;
}
HotShotEvent::ViewSyncCommitCertificate2Recv(certificate) => {
- debug!("Received view sync cert for phase {:?}", certificate);
+ tracing::debug!("Received view sync cert for phase {:?}", certificate);
let view = certificate.view_number();
self.send_to_or_create_replica(event, view, &event_stream)
.await;
}
HotShotEvent::ViewSyncFinalizeCertificate2Recv(certificate) => {
- debug!("Received view sync cert for phase {:?}", certificate);
+ tracing::debug!("Received view sync cert for phase {:?}", certificate);
let view = certificate.view_number();
self.send_to_or_create_replica(event, view, &event_stream)
.await;
}
HotShotEvent::ViewSyncTimeout(view, _, _) => {
- debug!("view sync timeout in main task {:?}", view);
+ tracing::debug!("view sync timeout in main task {:?}", view);
let view = *view;
self.send_to_or_create_replica(event, view, &event_stream)
.await;
@@ -291,27 +288,27 @@ impl, V: Versions> ViewSyncTaskSta
let relay = vote.date().relay;
let relay_map = map.entry(vote_view).or_insert(BTreeMap::new());
if let Some(relay_task) = relay_map.get_mut(&relay) {
- debug!("Forwarding message");
- let result = relay_task
- .handle_vote_event(Arc::clone(&event), &event_stream)
- .await;
+ tracing::debug!("Forwarding message");
- if result == Some(HotShotTaskCompleted) {
- // The protocol has finished
+ // Handle the vote and check if the accumulator has returned successfully
+ if relay_task
+ .handle_vote_event(Arc::clone(&event), &event_stream)
+ .await?
+ .is_some()
+ {
map.remove(&vote_view);
}
- return;
+
+ return Ok(());
}
// We do not have a relay task already running, so start one
- if self
- .membership
- .leader(vote_view + relay, self.current_epoch)
- != self.public_key
- {
- debug!("View sync vote sent to wrong leader");
- return;
- }
+ ensure!(
+ self.membership
+ .leader(vote_view + relay, self.current_epoch)?
+ == self.public_key,
+ "View sync vote sent to wrong leader"
+ );
let info = AccumulatorInfo {
public_key: self.public_key.clone(),
@@ -322,10 +319,9 @@ impl, V: Versions> ViewSyncTaskSta
};
let vote_collector =
create_vote_accumulator(&info, event, &event_stream, self.upgrade_lock.clone())
- .await;
- if let Some(vote_task) = vote_collector {
- relay_map.insert(relay, vote_task);
- }
+ .await?;
+
+ relay_map.insert(relay, vote_collector);
}
HotShotEvent::ViewSyncCommitVoteRecv(ref vote) => {
@@ -334,27 +330,27 @@ impl, V: Versions> ViewSyncTaskSta
let relay = vote.date().relay;
let relay_map = map.entry(vote_view).or_insert(BTreeMap::new());
if let Some(relay_task) = relay_map.get_mut(&relay) {
- debug!("Forwarding message");
- let result = relay_task
- .handle_vote_event(Arc::clone(&event), &event_stream)
- .await;
+ tracing::debug!("Forwarding message");
- if result == Some(HotShotTaskCompleted) {
- // The protocol has finished
+ // Handle the vote and check if the accumulator has returned successfully
+ if relay_task
+ .handle_vote_event(Arc::clone(&event), &event_stream)
+ .await?
+ .is_some()
+ {
map.remove(&vote_view);
}
- return;
+
+ return Ok(());
}
// We do not have a relay task already running, so start one
- if self
- .membership
- .leader(vote_view + relay, self.current_epoch)
- != self.public_key
- {
- debug!("View sync vote sent to wrong leader");
- return;
- }
+ ensure!(
+ self.membership
+ .leader(vote_view + relay, self.current_epoch)?
+ == self.public_key,
+ debug!("View sync vote sent to wrong leader")
+ );
let info = AccumulatorInfo {
public_key: self.public_key.clone(),
@@ -363,12 +359,11 @@ impl, V: Versions> ViewSyncTaskSta
epoch: self.current_epoch,
id: self.id,
};
+
let vote_collector =
create_vote_accumulator(&info, event, &event_stream, self.upgrade_lock.clone())
- .await;
- if let Some(vote_task) = vote_collector {
- relay_map.insert(relay, vote_task);
- }
+ .await?;
+ relay_map.insert(relay, vote_collector);
}
HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
@@ -377,27 +372,27 @@ impl, V: Versions> ViewSyncTaskSta
let relay = vote.date().relay;
let relay_map = map.entry(vote_view).or_insert(BTreeMap::new());
if let Some(relay_task) = relay_map.get_mut(&relay) {
- debug!("Forwarding message");
- let result = relay_task
- .handle_vote_event(Arc::clone(&event), &event_stream)
- .await;
+ tracing::debug!("Forwarding message");
- if result == Some(HotShotTaskCompleted) {
- // The protocol has finished
+ // Handle the vote and check if the accumulator has returned successfully
+ if relay_task
+ .handle_vote_event(Arc::clone(&event), &event_stream)
+ .await?
+ .is_some()
+ {
map.remove(&vote_view);
}
- return;
+
+ return Ok(());
}
// We do not have a relay task already running, so start one
- if self
- .membership
- .leader(vote_view + relay, self.current_epoch)
- != self.public_key
- {
- debug!("View sync vote sent to wrong leader");
- return;
- }
+ ensure!(
+ self.membership
+ .leader(vote_view + relay, self.current_epoch)?
+ == self.public_key,
+ debug!("View sync vote sent to wrong leader")
+ );
let info = AccumulatorInfo {
public_key: self.public_key.clone(),
@@ -409,7 +404,7 @@ impl, V: Versions> ViewSyncTaskSta
let vote_collector =
create_vote_accumulator(&info, event, &event_stream, self.upgrade_lock.clone())
.await;
- if let Some(vote_task) = vote_collector {
+ if let Ok(vote_task) = vote_collector {
relay_map.insert(relay, vote_task);
}
}
@@ -417,9 +412,10 @@ impl, V: Versions> ViewSyncTaskSta
&HotShotEvent::ViewChange(new_view) => {
let new_view = TYPES::View::new(*new_view);
if self.current_view < new_view {
- debug!(
+ tracing::debug!(
"Change from view {} to view {} in view sync task",
- *self.current_view, *new_view
+ *self.current_view,
+ *new_view
);
self.current_view = new_view;
@@ -454,13 +450,14 @@ impl, V: Versions> ViewSyncTaskSta
}
&HotShotEvent::Timeout(view_number) => {
// This is an old timeout and we can ignore it
- if view_number <= TYPES::View::new(*self.current_view) {
- return;
- }
+ ensure!(
+ view_number > self.current_view,
+ debug!("Discarding old timeout vote.")
+ );
self.num_timeouts_tracked += 1;
- let leader = self.membership.leader(view_number, self.current_epoch);
- warn!(
+ let leader = self.membership.leader(view_number, self.current_epoch)?;
+ tracing::warn!(
%leader,
leader_mnemonic = cdn_proto::util::mnemonic(&leader),
view_number = *view_number,
@@ -469,11 +466,11 @@ impl, V: Versions> ViewSyncTaskSta
);
if self.num_timeouts_tracked >= 3 {
- error!("Too many consecutive timeouts! This shouldn't happen");
+ tracing::error!("Too many consecutive timeouts! This shouldn't happen");
}
if self.num_timeouts_tracked >= 2 {
- error!("Starting view sync protocol for view {}", *view_number + 1);
+ tracing::error!("Starting view sync protocol for view {}", *view_number + 1);
self.send_to_or_create_replica(
Arc::new(HotShotEvent::ViewSyncTrigger(view_number + 1)),
@@ -496,6 +493,7 @@ impl, V: Versions> ViewSyncTaskSta
_ => {}
}
+ Ok(())
}
}
@@ -515,7 +513,7 @@ impl, V: Versions>
// Ignore certificate if it is for an older round
if certificate.view_number() < self.next_view {
- warn!("We're already in a higher round");
+ tracing::warn!("We're already in a higher round");
return None;
}
@@ -529,7 +527,7 @@ impl, V: Versions>
)
.await
{
- error!("Not valid view sync cert! {:?}", certificate.data());
+ tracing::error!("Not valid view sync cert! {:?}", certificate.data());
return None;
}
@@ -556,7 +554,7 @@ impl, V: Versions>
)
.await
else {
- error!("Failed to sign ViewSyncCommitData!");
+ tracing::error!("Failed to sign ViewSyncCommitData!");
return None;
};
let message = GeneralConsensusMessage::::ViewSyncCommitVote(vote);
@@ -581,7 +579,7 @@ impl, V: Versions>
let timeout = self.view_sync_timeout;
async move {
async_sleep(timeout).await;
- warn!("Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = {}", relay);
+ tracing::warn!("Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = {}", relay);
broadcast_event(
Arc::new(HotShotEvent::ViewSyncTimeout(
@@ -601,7 +599,7 @@ impl, V: Versions>
// Ignore certificate if it is for an older round
if certificate.view_number() < self.next_view {
- warn!("We're already in a higher round");
+ tracing::warn!("We're already in a higher round");
return None;
}
@@ -615,7 +613,7 @@ impl, V: Versions>
)
.await
{
- error!("Not valid view sync cert! {:?}", certificate.data());
+ tracing::error!("Not valid view sync cert! {:?}", certificate.data());
return None;
}
@@ -642,7 +640,7 @@ impl, V: Versions>
)
.await
else {
- error!("Failed to sign view sync finalized vote!");
+ tracing::error!("Failed to sign view sync finalized vote!");
return None;
};
let message = GeneralConsensusMessage::::ViewSyncFinalizeVote(vote);
@@ -655,7 +653,7 @@ impl, V: Versions>
.await;
}
- info!(
+ tracing::info!(
"View sync protocol has received view sync evidence to update the view to {}",
*self.next_view
);
@@ -677,7 +675,7 @@ impl, V: Versions>
let timeout = self.view_sync_timeout;
async move {
async_sleep(timeout).await;
- warn!(
+ tracing::warn!(
"Vote sending timed out in ViewSyncCommitCertificateRecv, relay = {}",
relay
);
@@ -697,7 +695,7 @@ impl, V: Versions>
HotShotEvent::ViewSyncFinalizeCertificate2Recv(certificate) => {
// Ignore certificate if it is for an older round
if certificate.view_number() < self.next_view {
- warn!("We're already in a higher round");
+ tracing::warn!("We're already in a higher round");
return None;
}
@@ -711,7 +709,7 @@ impl, V: Versions>
)
.await
{
- error!("Not valid view sync cert! {:?}", certificate.data());
+ tracing::error!("Not valid view sync cert! {:?}", certificate.data());
return None;
}
@@ -741,7 +739,7 @@ impl, V: Versions>
HotShotEvent::ViewSyncTrigger(view_number) => {
let view_number = *view_number;
if self.next_view != TYPES::View::new(*view_number) {
- error!("Unexpected view number to triger view sync");
+ tracing::error!("Unexpected view number to triger view sync");
return None;
}
@@ -757,7 +755,7 @@ impl, V: Versions>
)
.await
else {
- error!("Failed to sign pre commit vote!");
+ tracing::error!("Failed to sign pre commit vote!");
return None;
};
let message = GeneralConsensusMessage::::ViewSyncPreCommitVote(vote);
@@ -777,7 +775,7 @@ impl, V: Versions>
let timeout = self.view_sync_timeout;
async move {
async_sleep(timeout).await;
- warn!("Vote sending timed out in ViewSyncTrigger");
+ tracing::warn!("Vote sending timed out in ViewSyncTrigger");
broadcast_event(
Arc::new(HotShotEvent::ViewSyncTimeout(
TYPES::View::new(*next_view),
@@ -815,7 +813,7 @@ impl, V: Versions>
)
.await
else {
- error!("Failed to sign ViewSyncPreCommitData!");
+ tracing::error!("Failed to sign ViewSyncPreCommitData!");
return None;
};
let message =
@@ -843,7 +841,7 @@ impl, V: Versions>
let last_cert = last_seen_certificate.clone();
async move {
async_sleep(timeout).await;
- warn!(
+ tracing::warn!(
"Vote sending timed out in ViewSyncTimeout relay = {}",
relay
);
diff --git a/crates/task-impls/src/vote_collection.rs b/crates/task-impls/src/vote_collection.rs
index 533a96b719..4c685ca978 100644
--- a/crates/task-impls/src/vote_collection.rs
+++ b/crates/task-impls/src/vote_collection.rs
@@ -30,12 +30,9 @@ use hotshot_types::{
},
vote::{Certificate, HasViewNumber, Vote, VoteAccumulator},
};
-use tracing::{debug, error};
+use utils::anytrace::*;
-use crate::{
- events::{HotShotEvent, HotShotTaskCompleted},
- helpers::broadcast_event,
-};
+use crate::{events::HotShotEvent, helpers::broadcast_event};
/// Alias for a map of Vote Collectors
pub type VoteCollectorsMap =
@@ -74,8 +71,15 @@ pub trait AggregatableVote<
CERT: Certificate,
>
{
- /// return the leader for this votes in the given epoch
- fn leader(&self, membership: &TYPES::Membership, epoch: TYPES::Epoch) -> TYPES::SignatureKey;
+ /// return the leader for this votes
+ ///
+ /// # Errors
+ /// if the leader cannot be calculated
+ fn leader(
+ &self,
+ membership: &TYPES::Membership,
+ epoch: TYPES::Epoch,
+ ) -> Result;
/// return the Hotshot event for the completion of this CERT
fn make_cert_event(certificate: CERT, key: &TYPES::SignatureKey) -> HotShotEvent;
@@ -84,48 +88,54 @@ pub trait AggregatableVote<
impl<
TYPES: NodeType,
VOTE: Vote + AggregatableVote,
- CERT: Certificate + Debug,
+ CERT: Certificate + Clone + Debug,
V: Versions,
> VoteCollectionTaskState
{
/// Take one vote and accumulate it. Returns either the cert or the updated state
/// after the vote is accumulated
+ ///
+ /// # Errors
+ /// If are unable to accumulate the vote
#[allow(clippy::question_mark)]
pub async fn accumulate_vote(
&mut self,
vote: &VOTE,
event_stream: &Sender>>,
- ) -> Option {
- if vote.leader(&self.membership, self.epoch) != self.public_key {
- error!("Received vote for a view in which we were not the leader.");
- return None;
- }
-
- if vote.view_number() != self.view {
+ ) -> Result