Skip to content

Commit

Permalink
Merge pull request #23 from KasarLabs/param
Browse files Browse the repository at this point in the history
fix(sync): Cleaned mc-sync isolating fetch process + added shared SyncStatus
  • Loading branch information
antiyro committed Mar 22, 2024
2 parents 5af24f9 + 02b45b6 commit 30c5917
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 314 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ git # Deoxys Changelog

## Next release

- fix(sync): Cleaned mc-sync isolating fetch process + added shared SyncStatus
- feat(self-hosted): host our own runner
- fix(deps): Removed unused dependencies
- feat(multi-trie): Added support for persistent storage tries
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where

let actual_fee = execution_infos.actual_fee.0.into();

let finality_status = if block_number <= mc_sync::l1::ETHEREUM_STATE_UPDATE.lock().unwrap().block_number {
let finality_status = if block_number <= mc_sync::l1::ETHEREUM_STATE_UPDATE.read().unwrap().block_number {
TransactionFinalityStatus::AcceptedOnL1
} else {
TransactionFinalityStatus::AcceptedOnL2
Expand Down
2 changes: 1 addition & 1 deletion crates/client/rpc/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub(crate) fn tx_conv(txs: &[mp_transactions::Transaction], tx_hashes: Vec<Field
}

pub(crate) fn status(block_number: u64) -> BlockStatus {
if block_number <= ETHEREUM_STATE_UPDATE.lock().unwrap().block_number {
if block_number <= ETHEREUM_STATE_UPDATE.read().unwrap().block_number {
BlockStatus::AcceptedOnL1
} else {
BlockStatus::AcceptedOnL2
Expand Down
238 changes: 238 additions & 0 deletions crates/client/sync/src/fetch/fetchers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
//! Contains the code required to fetch data from the network efficiently.
use std::sync::Arc;

use itertools::Itertools;
use mc_storage::OverrideHandle;
use mp_contract::class::{ContractClassData, ContractClassWrapper};
use mp_felt::Felt252Wrapper;
use mp_storage::StarknetStorageSchemaVersion;
use sp_blockchain::HeaderBackend;
use sp_core::{H160, H256};
use sp_runtime::generic::{Block, Header};
use sp_runtime::traits::{BlakeTwo256, Block as BlockT};
use sp_runtime::OpaqueExtrinsic;
use starknet_api::api_core::ClassHash;
use starknet_core::types::BlockId as BlockIdCore;
use starknet_ff::FieldElement;
use starknet_providers::sequencer::models as p;
use starknet_providers::sequencer::models::state_update::{DeclaredContract, DeployedContract};
use starknet_providers::sequencer::models::{BlockId, StateUpdate};
use starknet_providers::{Provider, ProviderError, SequencerGatewayProvider};
use tokio::task::JoinSet;
use tokio::time::Duration;
use url::Url;

use crate::l2::L2SyncError;
use crate::utility::{block_hash_deoxys, block_hash_substrate};

/// The configuration of the worker responsible for fetching new blocks and state updates from the
/// feeder.
#[derive(Clone, Debug)]
pub struct FetchConfig {
/// The URL of the sequencer gateway.
pub gateway: Url,
/// The URL of the feeder gateway.
pub feeder_gateway: Url,
/// The ID of the chain served by the sequencer gateway.
pub chain_id: starknet_ff::FieldElement,
/// The number of tasks spawned to fetch blocks and state updates.
pub workers: u32,
/// Whether to play a sound when a new block is fetched.
pub sound: bool,
/// The L1 contract core address
pub l1_core_address: H160,
/// Whether to check the root of the state update
pub verify: bool,
}

pub async fn fetch_block(client: &SequencerGatewayProvider, block_number: u64) -> Result<p::Block, L2SyncError> {
let block = client.get_block(BlockId::Number(block_number)).await?;

Ok(block)
}

pub async fn fetch_block_and_updates<B, C>(
block_n: u64,
provider: &SequencerGatewayProvider,
overrides: &Arc<OverrideHandle<Block<Header<u32, BlakeTwo256>, OpaqueExtrinsic>>>,
client: &C,
) -> Result<(p::Block, StateUpdate, Vec<ContractClassData>), L2SyncError>
where
B: BlockT,
C: HeaderBackend<B>,
{
const MAX_RETRY: u32 = 15;
let mut attempt = 0;
let base_delay = Duration::from_secs(1);

loop {
log::debug!("fetch_block_and_updates {}", block_n);
let block = fetch_block(provider, block_n);
let state_update = fetch_state_and_class_update(provider, block_n, overrides, client);
let (block, state_update) = tokio::join!(block, state_update);
log::debug!("fetch_block_and_updates: done {block_n}");

match block.as_ref().err().or(state_update.as_ref().err()) {
Some(L2SyncError::Provider(ProviderError::RateLimited)) => {
log::debug!("The fetching process has been rate limited, retrying in {:?} seconds", base_delay);
attempt += 1;
if attempt >= MAX_RETRY {
return Err(L2SyncError::FetchRetryLimit);
}
// Exponential backoff with a cap on the delay
let delay = base_delay * 2_u32.pow(attempt - 1).min(6); // Cap to prevent overly long delays
tokio::time::sleep(delay).await;
}
_ => {
let (block, (state_update, class_update)) = (block?, state_update?);
return Ok((block, state_update, class_update));
}
}
}
}

pub async fn fetch_apply_genesis_block(config: FetchConfig) -> Result<mp_block::Block, String> {
let client = SequencerGatewayProvider::new(config.gateway.clone(), config.feeder_gateway.clone(), config.chain_id);
let block = client.get_block(BlockId::Number(0)).await.map_err(|e| format!("failed to get block: {e}"))?;

Ok(crate::convert::block(block).await)
}

#[allow(clippy::too_many_arguments)]
async fn fetch_state_and_class_update<B, C>(
provider: &SequencerGatewayProvider,
block_number: u64,
overrides: &Arc<OverrideHandle<Block<Header<u32, BlakeTwo256>, OpaqueExtrinsic>>>,
client: &C,
) -> Result<(StateUpdate, Vec<ContractClassData>), L2SyncError>
where
B: BlockT,
C: HeaderBackend<B>,
{
// Children tasks need StateUpdate as an Arc, because of task spawn 'static requirement
// We make an Arc, and then unwrap the StateUpdate out of the Arc
let state_update = Arc::new(fetch_state_update(provider, block_number).await?);
let class_update = fetch_class_update(provider, &state_update, overrides, block_number, client).await?;
let state_update = Arc::try_unwrap(state_update).expect("arc should not be aliased");

Ok((state_update, class_update))
}

/// retrieves state update from Starknet sequencer
async fn fetch_state_update(
provider: &SequencerGatewayProvider,
block_number: u64,
) -> Result<StateUpdate, L2SyncError> {
let state_update = provider.get_state_update(BlockId::Number(block_number)).await?;

Ok(state_update)
}

/// retrieves class updates from Starknet sequencer
async fn fetch_class_update<B, C>(
provider: &SequencerGatewayProvider,
state_update: &Arc<StateUpdate>,
overrides: &Arc<OverrideHandle<Block<Header<u32, BlakeTwo256>, OpaqueExtrinsic>>>,
block_number: u64,
client: &C,
) -> Result<Vec<ContractClassData>, L2SyncError>
where
B: BlockT,
C: HeaderBackend<B>,
{
// defaults to downloading ALL classes if a substrate block hash could not be determined
let missing_classes = match block_hash_substrate(client, block_number) {
Some(block_hash_substrate) => fetch_missing_classes(state_update, overrides, block_hash_substrate),
None => aggregate_classes(state_update),
};

let arc_provider = Arc::new(provider.clone());
let mut task_set = missing_classes.into_iter().fold(JoinSet::new(), |mut set, class_hash| {
let provider = Arc::clone(&arc_provider);
let state_update = Arc::clone(state_update);
let class_hash = *class_hash;
set.spawn(async move { fetch_class(class_hash, block_hash_deoxys(&state_update), &provider).await });
set
});

// WARNING: all class downloads will abort if even a single class fails to download.
let mut classes = vec![];
while let Some(res) = task_set.join_next().await {
classes.push(res.expect("Join error")?);
// No need to `abort_all()` the `task_set` in cast of errors, as dropping the `task_set`
// will abort all the tasks.
}

Ok(classes)
}

/// Downloads a class definition from the Starknet sequencer. Note that because
/// of the current type hell this needs to be converted into a blockifier equivalent
async fn fetch_class(
class_hash: FieldElement,
block_hash: FieldElement,
provider: &SequencerGatewayProvider,
) -> Result<ContractClassData, L2SyncError> {
// log::info!("💾 Downloading class {class_hash:#x}");
let core_class = provider.get_class(BlockIdCore::Hash(block_hash), class_hash).await?;

// Core classes have to be converted into Blockifier classes to gain support
// for Substrate [`Encode`] and [`Decode`] traits
Ok(ContractClassData {
// TODO: find a less roundabout way of converting from a Felt252Wrapper
hash: ClassHash(Felt252Wrapper::from(class_hash).into()),
// TODO: remove this expect when ContractClassWrapper::try_from does proper error handling using
// thiserror
contract_class: ContractClassWrapper::try_from(core_class).expect("converting contract class"),
})
}

/// Filters out class declarations in the Starknet sequencer state update
/// and retains only those which are not stored in the local Substrate db.
fn fetch_missing_classes<'a>(
state_update: &'a StateUpdate,
overrides: &Arc<OverrideHandle<Block<Header<u32, BlakeTwo256>, OpaqueExtrinsic>>>,
block_hash_substrate: H256,
) -> Vec<&'a FieldElement> {
aggregate_classes(state_update)
.into_iter()
.filter(|class_hash| is_missing_class(overrides, block_hash_substrate, Felt252Wrapper::from(**class_hash)))
.collect()
}

/// Retrieves all class hashes from state update. This includes newly deployed
/// contract class hashes, Sierra class hashes and Cairo class hashes
fn aggregate_classes(state_update: &StateUpdate) -> Vec<&FieldElement> {
std::iter::empty()
.chain(
state_update
.state_diff
.deployed_contracts
.iter()
.map(|DeployedContract { address: _, class_hash }| class_hash),
)
.chain(
state_update
.state_diff
.declared_classes
.iter()
.map(|DeclaredContract { class_hash, compiled_class_hash: _ }| class_hash),
)
.unique()
.collect()
}

/// Check if a class is stored in the local Substrate db.
///
/// Since a change in class definition will result in a change in class hash,
/// this means we only need to check for class hashes in the db.
fn is_missing_class(
overrides: &Arc<OverrideHandle<Block<Header<u32, BlakeTwo256>, OpaqueExtrinsic>>>,
block_hash_substrate: H256,
class_hash: Felt252Wrapper,
) -> bool {
overrides
.for_schema_version(&StarknetStorageSchemaVersion::Undefined)
.contract_class_by_class_hash(block_hash_substrate, ClassHash::from(class_hash))
.is_none()
}
1 change: 1 addition & 0 deletions crates/client/sync/src/fetch/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod fetchers;
9 changes: 5 additions & 4 deletions crates/client/sync/src/l1.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Contains the necessaries to perform an L1 verification of the state

use std::sync::{Arc, Mutex};
use std::sync::{Arc, RwLock};

use anyhow::Result;
use ethers::contract::{abigen, EthEvent};
Expand All @@ -23,7 +23,7 @@ use crate::utils::constant::LOG_STATE_UPDTATE_TOPIC;

lazy_static! {
/// Shared latest L2 state update verified on L1
pub static ref ETHEREUM_STATE_UPDATE: Arc<Mutex<L1StateUpdate>> = Arc::new(Mutex::new(L1StateUpdate {
pub static ref ETHEREUM_STATE_UPDATE: Arc<RwLock<L1StateUpdate>> = Arc::new(RwLock::new(L1StateUpdate {
block_number: u64::default(),
global_root: StarkHash::default(),
block_hash: StarkHash::default(),
Expand Down Expand Up @@ -192,14 +192,15 @@ pub fn update_l1(state_update: L1StateUpdate) {

{
let last_state_update = ETHEREUM_STATE_UPDATE.clone();
let mut new_state_update = last_state_update.lock().unwrap();
let mut new_state_update =
last_state_update.write().expect("Failed to acquire write lock on ETHEREUM_STATE_UPDATE");
*new_state_update = state_update.clone();
}
}

/// Verify the L1 state with the latest data
pub async fn verify_l1(state_update: L1StateUpdate, rpc_port: u16) -> Result<(), String> {
let starknet_state_block_number = STARKNET_STATE_UPDATE.lock().map_err(|e| e.to_string())?.block_number;
let starknet_state_block_number = STARKNET_STATE_UPDATE.read().map_err(|e| e.to_string())?.block_number;

// Check if the node reached the latest verified state on Ethereum
if state_update.block_number > starknet_state_block_number {
Expand Down
Loading

0 comments on commit 30c5917

Please sign in to comment.