Skip to content

Commit

Permalink
feat(f3): implement Finalize for EC backend (#4897)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 authored Oct 16, 2024
1 parent 49cdcb3 commit 7b3b90f
Show file tree
Hide file tree
Showing 19 changed files with 156 additions and 47 deletions.
2 changes: 2 additions & 0 deletions .config/forest.dic
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Enum
EOF
Ethereum
exa
F3
FFI
FIL
Filecoin/M
Filops
Expand Down
10 changes: 7 additions & 3 deletions f3-sidecar/ffi_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ import (

func init() {
setGoDebugEnv()
logging.SetAllLoggers(logging.LevelWarn)
err := logging.SetLogLevel("f3/sidecar", "info")
logging.SetAllLoggers(logging.LevelInfo)
err := logging.SetLogLevel("dht", "error")
checkError(err)
err = logging.SetLogLevel("f3", "info")
err = logging.SetLogLevel("dht/RtRefreshManager", "warn")
checkError(err)
err = logging.SetLogLevel("net/identify", "error")
checkError(err)
err = logging.SetLogLevel("f3/sidecar", "debug")
checkError(err)
GoF3NodeImpl = &f3Impl{ctx: context.Background()}
}
Expand Down
12 changes: 9 additions & 3 deletions f3-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ import (
var logger = logging.Logger("f3/sidecar")

func main() {
logging.SetAllLoggers(logging.LevelError)
if err := logging.SetLogLevel("f3/sidecar", "debug"); err != nil {
logging.SetAllLoggers(logging.LevelInfo)
if err := logging.SetLogLevel("dht", "error"); err != nil {
panic(err)
}
if err := logging.SetLogLevel("dht/RtRefreshManager", "warn"); err != nil {
panic(err)
}
if err := logging.SetLogLevel("f3", "debug"); err != nil {
if err := logging.SetLogLevel("net/identify", "error"); err != nil {
panic(err)
}
if err := logging.SetLogLevel("f3/sidecar", "debug"); err != nil {
panic(err)
}

Expand Down
3 changes: 1 addition & 2 deletions f3-sidecar/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ func run(ctx context.Context, rpcEndpoint string, f3RpcEndpoint string, initialP
} else {
m.BootstrapEpoch = bootstrapEpoch
}
m.CommitteeLookback = 5
// m.Pause = true
m.CommitteeLookback = manifest.DefaultCommitteeLookback

var manifestProvider manifest.ManifestProvider
switch manifestServerID, err := peer.Decode(manifestServer); {
Expand Down
3 changes: 2 additions & 1 deletion src/blocks/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ pub struct CachingBlockHeader {

impl PartialEq for CachingBlockHeader {
fn eq(&self, other: &Self) -> bool {
self.cid() == other.cid()
// Epoch check is redundant but cheap.
self.uncached.epoch == other.uncached.epoch && self.cid() == other.cid()
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/chain_sync/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ where
})
}

/// Returns a clone of the inner [`SyncNetworkContext`]
pub fn sync_network_context(&self) -> SyncNetworkContext<DB> {
self.network.clone()
}

/// Returns a clone of the bad blocks cache to be used outside of chain
/// sync.
pub fn bad_blocks_cloned(&self) -> Arc<BadBlockCache> {
Expand Down
2 changes: 1 addition & 1 deletion src/chain_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod bad_block_cache;
mod chain_muxer;
pub mod consensus;
mod metrics;
mod network_context;
pub mod network_context;
mod sync_state;
mod tipset_syncer;
mod validation;
Expand Down
8 changes: 6 additions & 2 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ const MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: usize = 2;
/// Context used in chain sync to handle network requests.
/// This contains the peer manager, P2P service interface, and [`Blockstore`]
/// required to make network requests.
pub(in crate::chain_sync) struct SyncNetworkContext<DB> {
pub struct SyncNetworkContext<DB> {
/// Channel to send network messages through P2P service
network_send: flume::Sender<NetworkMessage>,

/// Manages peers to send requests to and updates request stats for the
/// respective peers.
peer_manager: Arc<PeerManager>,
Expand Down Expand Up @@ -141,6 +140,11 @@ where
self.peer_manager.as_ref()
}

/// Returns a reference to the channel for sending network messages through P2P service.
pub fn network_send(&self) -> &flume::Sender<NetworkMessage> {
&self.network_send
}

/// Send a `chain_exchange` request for only block headers (ignore
/// messages). If `peer_id` is `None`, requests will be sent to a set of
/// shuffled peers.
Expand Down
5 changes: 4 additions & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ pub(super) async fn start(
)?;
let bad_blocks = chain_muxer.bad_blocks_cloned();
let sync_state = chain_muxer.sync_state_cloned();
let sync_network_context = chain_muxer.sync_network_context();
services.spawn(async { Err(anyhow::anyhow!("{}", chain_muxer.await)) });

if config.client.enable_health_check {
Expand Down Expand Up @@ -402,7 +403,7 @@ pub(super) async fn start(
bad_blocks,
sync_state,
eth_event_handler: Arc::new(EthEventHandler::new()),
network_send,
sync_network_context,
network_name,
start_time,
shutdown: shutdown_send,
Expand All @@ -414,6 +415,7 @@ pub(super) async fn start(
});

services.spawn_blocking({
let chain_config = chain_config.clone();
let default_f3_root = config.client.data_dir.join(format!("f3/{}", config.chain));
let crate::f3::F3Options {
chain_finality,
Expand All @@ -423,6 +425,7 @@ pub(super) async fn start(
} = crate::f3::get_f3_sidecar_params(&chain_config);
move || {
crate::f3::run_f3_sidecar_if_enabled(
&chain_config,
format!("http://{rpc_address}/rpc/v1"),
crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
initial_power_table.to_string(),
Expand Down
16 changes: 10 additions & 6 deletions src/f3/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

#![allow(clippy::too_many_arguments)]

#[cfg(all(f3sidecar, not(feature = "no-f3-sidecar")))]
mod go_ffi;
#[cfg(all(f3sidecar, not(feature = "no-f3-sidecar")))]
Expand All @@ -9,7 +11,7 @@ use go_ffi::*;
use cid::Cid;
use libp2p::PeerId;

use crate::{networks::ChainConfig, utils::misc::env::is_env_truthy};
use crate::{networks::ChainConfig, utils::misc::env::is_env_set_and_truthy};

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct F3Options {
Expand Down Expand Up @@ -85,6 +87,7 @@ pub fn get_f3_sidecar_params(chain_config: &ChainConfig) -> F3Options {
}

pub fn run_f3_sidecar_if_enabled(
chain_config: &ChainConfig,
_rpc_endpoint: String,
_f3_rpc_endpoint: String,
_initial_power_table: String,
Expand All @@ -93,7 +96,7 @@ pub fn run_f3_sidecar_if_enabled(
_f3_root: String,
_manifest_server: String,
) {
if is_sidecar_ffi_enabled() {
if is_sidecar_ffi_enabled(chain_config) {
#[cfg(all(f3sidecar, not(feature = "no-f3-sidecar")))]
{
GoF3NodeImpl::run(
Expand All @@ -109,10 +112,11 @@ pub fn run_f3_sidecar_if_enabled(
}
}

// Use opt-in mode for now. Consider switching to opt-out mode once F3 is shipped.
fn is_sidecar_ffi_enabled() -> bool {
// Opt-out building the F3 sidecar staticlib
let enabled = is_env_truthy("FOREST_F3_SIDECAR_FFI_ENABLED");
/// Whether F3 sidecar via FFI is enabled.
fn is_sidecar_ffi_enabled(chain_config: &ChainConfig) -> bool {
// Respect the environment variable when set, and fallback to chain config when not set.
let enabled =
is_env_set_and_truthy("FOREST_F3_SIDECAR_FFI_ENABLED").unwrap_or(chain_config.f3_enabled);
cfg_if::cfg_if! {
if #[cfg(all(f3sidecar, not(feature = "no-f3-sidecar")))] {
enabled
Expand Down
15 changes: 14 additions & 1 deletion src/networks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ pub struct ChainConfig {
pub breeze_gas_tamping_duration: i64,
// FIP0081 gradually comes into effect over this many epochs.
pub fip0081_ramp_duration_epochs: u64,
pub f3_enabled: bool,
// F3Consensus set whether F3 should checkpoint tipsets finalized by F3. This flag has no effect if F3 is not enabled.
pub f3_consensus: bool,
pub f3_bootstrap_epoch: i64,
pub f3_initial_power_table: Cid,
// This will likely be deprecated once F3 is fully bootstrapped to avoid single point network dependencies.
Expand All @@ -254,6 +257,8 @@ impl ChainConfig {
breeze_gas_tamping_duration: BREEZE_GAS_TAMPING_DURATION,
// 1 year on mainnet
fip0081_ramp_duration_epochs: 365 * EPOCHS_IN_DAY as u64,
f3_enabled: false,
f3_consensus: false,
f3_bootstrap_epoch: -1,
f3_initial_power_table: Default::default(),
f3_manifest_server: Some(
Expand Down Expand Up @@ -282,6 +287,10 @@ impl ChainConfig {
breeze_gas_tamping_duration: BREEZE_GAS_TAMPING_DURATION,
// 3 days on calibnet
fip0081_ramp_duration_epochs: 3 * EPOCHS_IN_DAY as u64,
// Enable after `f3_initial_power_table` is determined and set to avoid GC hell
// (state tree of epoch 2_081_674 - 900 has to be present in the database if `f3_initial_power_table` is not set)
f3_enabled: false,
f3_consensus: true,
// 2024-10-24T13:30:00Z
f3_bootstrap_epoch: 2_081_674,
f3_initial_power_table: Default::default(),
Expand All @@ -308,6 +317,8 @@ impl ChainConfig {
breeze_gas_tamping_duration: BREEZE_GAS_TAMPING_DURATION,
// Devnet ramp is 200 epochs in Lotus (subject to change).
fip0081_ramp_duration_epochs: env_or_default(ENV_PLEDGE_RULE_RAMP, 200),
f3_enabled: false,
f3_consensus: false,
f3_bootstrap_epoch: -1,
f3_initial_power_table: Default::default(),
f3_manifest_server: None,
Expand Down Expand Up @@ -335,7 +346,9 @@ impl ChainConfig {
ENV_PLEDGE_RULE_RAMP,
365 * EPOCHS_IN_DAY as u64,
),
f3_bootstrap_epoch: -1,
f3_enabled: true,
f3_consensus: true,
f3_bootstrap_epoch: 2760,
f3_initial_power_table: Default::default(),
f3_manifest_server: Some(
"12D3KooWJr9jy4ngtJNR7JC1xgLFra3DjEtyxskRYWvBK9TC3Yn6"
Expand Down
1 change: 1 addition & 0 deletions src/rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ macro_rules! from2internal {
// TODO(forest): https://github.com/ChainSafe/forest/issues/3965
// Just mapping everything to an internal error is not appropriate
from2internal! {
String,
anyhow::Error,
base64::DecodeError,
cid::multibase::Error,
Expand Down
58 changes: 53 additions & 5 deletions src/rpc/methods/f3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ use crate::{
clock::ChainEpoch,
crypto::Signature,
},
utils::misc::env::is_env_set_and_truthy,
};
use ahash::{HashMap, HashSet};
use anyhow::Context as _;
use fil_actor_interface::{
convert::{
from_policy_v13_to_v10, from_policy_v13_to_v11, from_policy_v13_to_v12,
Expand All @@ -37,7 +39,7 @@ use libp2p::PeerId;
use num::Signed as _;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use std::{borrow::Cow, fmt::Display, str::FromStr as _, sync::Arc};
use std::{borrow::Cow, fmt::Display, num::NonZeroU64, str::FromStr as _, sync::Arc};

static F3_LEASE_MANAGER: Lazy<F3LeaseManager> = Lazy::new(Default::default);

Expand Down Expand Up @@ -423,7 +425,7 @@ impl RpcMethod<1> for ProtectPeer {
) -> Result<Self::Ok, ServerError> {
let peer_id = PeerId::from_str(&peer_id)?;
let (tx, rx) = flume::bounded(1);
ctx.network_send
ctx.network_send()
.send_async(NetworkMessage::JSONRPCRequest {
method: NetRPCMethods::ProtectPeer(tx, std::iter::once(peer_id).collect()),
})
Expand Down Expand Up @@ -463,10 +465,56 @@ impl RpcMethod<1> for Finalize {
type Ok = ();

async fn handle(
_: Ctx<impl Blockstore>,
(_tsk,): Self::Params,
ctx: Ctx<impl Blockstore>,
(f3_tsk,): Self::Params,
) -> Result<Self::Ok, ServerError> {
// TODO(hanabi1224): https://github.com/ChainSafe/forest/issues/4775
// Respect the environment variable when set, and fallback to chain config when not set.
let enabled = is_env_set_and_truthy("FOREST_F3_CONSENSUS_ENABLED")
.unwrap_or(ctx.chain_config().f3_consensus);
if !enabled {
return Ok(());
}

let tsk = f3_tsk.try_into()?;
let finalized_ts = match ctx.chain_index().load_tipset(&tsk)? {
Some(ts) => ts,
None => {
let ts = ctx
.sync_network_context
.chain_exchange_headers(None, &tsk, NonZeroU64::new(1).expect("Infallible"))
.await?
.first()
.cloned()
.with_context(|| {
format!("failed to get tipset via chain exchange. tsk: {tsk}")
})?;
ctx.chain_store().put_tipset(&ts)?;
ts
}
};
tracing::info!(
"F3 finalized tsk {} at epoch {}",
finalized_ts.key(),
finalized_ts.epoch()
);
let head = ctx.chain_store().heaviest_tipset();
// When finalized_ts is not part of the current chain,
// reset the current head to finalized_ts.
// Note that when finalized_ts is newer than head, we don't reset the head
// to allow the chain to catch up.
if head.epoch() >= finalized_ts.epoch()
&& !head
.chain_arc(ctx.store())
.take_while(|ts| ts.epoch() >= finalized_ts.epoch())
.any(|ts| ts == finalized_ts)
{
tracing::info!(
"F3 reset chain head to tsk {} at epoch {}",
finalized_ts.key(),
finalized_ts.epoch()
);
ctx.chain_store().set_heaviest_tipset(finalized_ts)?;
}
Ok(())
}
}
Expand Down
Loading

0 comments on commit 7b3b90f

Please sign in to comment.