Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: decompose building into execution and packaging #32

Merged
merged 3 commits into from
Aug 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 155 additions & 75 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures_util::{stream::Fuse, FutureExt, Stream, StreamExt};
use reth_interfaces::Error as RethError;
use reth_payload_builder::{
database::CachedReads, error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive,
PayloadBuilderAttributes, PayloadJob, PayloadJobGenerator,
error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes,
PayloadJob, PayloadJobGenerator,
};
use reth_primitives::{
constants::{BEACON_NONCE, EMPTY_OMMER_ROOT},
proofs, Block, BlockNumber, ChainSpec, Header, Receipt, SealedHeader, TransactionSigned, U256,
proofs, Block, BlockNumber, ChainSpec, Header, Receipt, SealedBlock, SealedHeader,
TransactionSigned, U256,
};
use reth_provider::{
BlockReaderIdExt, CanonStateNotification, PostState, StateProvider, StateProviderFactory,
Expand All @@ -27,8 +28,8 @@ use reth_revm::{
},
into_reth_log,
revm::{
db::CacheDB,
primitives::{result::InvalidTransaction, Env, ResultAndState},
db::{CacheDB, DatabaseRef},
primitives::{result::InvalidTransaction, BlockEnv, CfgEnv, Env, ResultAndState},
EVM,
},
};
Expand Down Expand Up @@ -208,7 +209,7 @@ where
let client = Arc::clone(&this.client);
let pending = task::spawn_blocking(move || {
// TODO: come back to this
build(config, client, CachedReads::default(), bundles.into_iter())
build(config, client, bundles.into_iter())
});

this.pending_payloads.push_back(pending);
Expand Down Expand Up @@ -284,7 +285,6 @@ where
let empty = build(
self.config.clone(),
Arc::clone(&self.client),
CachedReads::default(),
self.bundles.clone().into_iter(),
)?;
Ok(empty.inner)
Expand All @@ -300,7 +300,7 @@ where
let client = Arc::clone(&self.client);
let bundles = self.bundles.clone().into_iter();
task::spawn_blocking(move || {
let payload = build(config, client, CachedReads::default(), bundles);
let payload = build(config, client, bundles);
let _ = tx.send(payload);
});

Expand Down Expand Up @@ -463,27 +463,26 @@ where
fn build<Client: StateProviderFactory, I: Iterator<Item = (BundleId, BundleCompact)>>(
config: JobConfig,
client: Arc<Client>,
cached_reads: CachedReads,
bundles: I,
) -> Result<Payload, PayloadBuilderError> {
let state = client.state_by_block_hash(config.parent.hash)?;
let state = State::new(state);
build_on_state(config, state, cached_reads, bundles)
build_on_state(config, state, bundles)
}

fn build_on_state<S: StateProvider, I: Iterator<Item = (BundleId, BundleCompact)>>(
config: JobConfig,
state: State<S>,
mut cached_reads: CachedReads,
bundles: I,
) -> Result<Payload, PayloadBuilderError> {
let mut db = CacheDB::new(cached_reads.as_db(&state));
let state = Arc::new(state);
let mut db = CacheDB::new(Arc::clone(&state));

let mut post_state = PostState::default();

let (cfg_env, block_env) = config
.attributes
.cfg_and_block_env(&config.chain, &config.parent);
let base_fee = block_env.basefee.to::<u64>();
let block_num = block_env.number.to::<u64>();
let block_gas_limit: u64 = block_env.gas_limit.try_into().unwrap_or(u64::MAX);

Expand All @@ -499,50 +498,33 @@ fn build_on_state<S: StateProvider, I: Iterator<Item = (BundleId, BundleCompact)
continue;
}

for tx in bundle.0.into_iter() {
let tx =
tx.into_ecrecovered()
.ok_or(PayloadBuilderError::Internal(RethError::Custom(
"unable to recover tx signer".into(),
)))?;

// construct EVM
let tx_env = tx_env_with_recovered(&tx);
let env = Env {
cfg: cfg_env.clone(),
block: block_env.clone(),
tx: tx_env.clone(),
};
let mut evm = EVM::with_env(env);
evm.database(&mut db);

// NOTE: you can do far more reasonable error handling here. if a transaction
// within a bundle fails, we don't have to fail the entire payload.
let ResultAndState { result, state } = evm
.transact()
.map_err(PayloadBuilderError::EvmExecutionError)?;

commit_state_changes(&mut db, &mut post_state, block_num, state, true);

cumulative_gas_used += result.gas_used();

post_state.add_receipt(
block_num,
Receipt {
tx_type: tx.tx_type(),
success: result.is_success(),
cumulative_gas_used,
logs: result.logs().into_iter().map(into_reth_log).collect(),
},
);

let miner_fee = tx
.effective_tip_per_gas(base_fee)
.ok_or(InvalidTransaction::GasPriceLessThanBasefee)
.map_err(|err| PayloadBuilderError::EvmExecutionError(err.into()))?;
total_fees += U256::from(miner_fee) * U256::from(result.gas_used());

txs.push(tx.into_signed());
// clone the database, so that if the execution fails, then we can keep the state of the
// database as if the execution was never attempted. currently, there is no way to roll
// back the database state if the execution fails part-way through.
//
// NOTE: we will be able to refactor to do rollbacks after the following is merged:
// https://github.com/paradigmxyz/reth/pull/3512
let mut tmp_db = db.clone();

let mut bundle = bundle.0;
let execution = execute(
&mut tmp_db,
&cfg_env,
&block_env,
cumulative_gas_used,
bundle.clone().into_iter(),
);

if let Ok(execution) = execution {
total_fees += execution.total_fees;
cumulative_gas_used = execution.cumulative_gas_used;

txs.append(&mut bundle);
post_state.extend(execution.post_state);

db = tmp_db;
} else {
continue;
}

// add bundle to set of executed bundles
Expand All @@ -558,47 +540,145 @@ fn build_on_state<S: StateProvider, I: Iterator<Item = (BundleId, BundleCompact)
for (address, increment) in balance_increments {
increment_account_balance(&mut db, &mut post_state, block_num, address, increment)?;
}
let withdrawals_root = proofs::calculate_withdrawals_root(&config.attributes.withdrawals);

// compute other accumulators
let block = package_block(
state.state(),
&config.attributes,
config.extra_data,
&block_env,
txs,
post_state,
cumulative_gas_used,
)?;

let payload = BuiltPayload::new(config.attributes.id, block, total_fees);
let payload = Payload {
inner: Arc::new(payload),
bundles: bundle_ids,
};

Ok(payload)
}

struct Execution {
post_state: PostState,
cumulative_gas_used: u64,
total_fees: U256,
}

fn execute<DB, I>(
db: &mut CacheDB<DB>,
cfg_env: &CfgEnv,
block_env: &BlockEnv,
mut cumulative_gas_used: u64,
txs: I,
) -> Result<Execution, PayloadBuilderError>
where
DB: DatabaseRef<Error = RethError>,
I: Iterator<Item = TransactionSigned>,
{
let base_fee = block_env.basefee.to::<u64>();
let block_num = block_env.number.to::<u64>();

let mut total_fees = U256::ZERO;
let mut post_state = PostState::default();

for tx in txs {
let tx = tx
.into_ecrecovered()
.ok_or(PayloadBuilderError::Internal(RethError::Custom(
"unable to recover tx signer".into(),
)))?;

// construct EVM
let tx_env = tx_env_with_recovered(&tx);
let env = Env {
cfg: cfg_env.clone(),
block: block_env.clone(),
tx: tx_env.clone(),
};
let mut evm = EVM::with_env(env);
evm.database(&mut *db);

// execute transaction
let ResultAndState { result, state } = evm
.transact()
.map_err(PayloadBuilderError::EvmExecutionError)?;

// commit changes to DB and post state
commit_state_changes(db, &mut post_state, block_num, state, true);

cumulative_gas_used += result.gas_used();

post_state.add_receipt(
block_num,
Receipt {
tx_type: tx.tx_type(),
success: result.is_success(),
cumulative_gas_used,
logs: result.logs().into_iter().map(into_reth_log).collect(),
},
);

let miner_fee = tx
.effective_tip_per_gas(base_fee)
.ok_or(InvalidTransaction::GasPriceLessThanBasefee)
.map_err(|err| PayloadBuilderError::EvmExecutionError(err.into()))?;
total_fees += U256::from(miner_fee) * U256::from(result.gas_used());
}

Ok(Execution {
post_state,
cumulative_gas_used,
total_fees,
})
}

fn package_block<S: StateProvider>(
state: S,
attributes: &PayloadBuilderAttributes,
extra_data: u128,
block_env: &BlockEnv,
txs: Vec<TransactionSigned>,
post_state: PostState,
cumulative_gas_used: u64,
) -> Result<SealedBlock, PayloadBuilderError> {
let base_fee = block_env.basefee.to::<u64>();
let block_num = block_env.number.to::<u64>();
let block_gas_limit: u64 = block_env.gas_limit.try_into().unwrap_or(u64::MAX);

// compute accumulators
let receipts_root = post_state.receipts_root(block_num);
let logs_bloom = post_state.logs_bloom(block_num);
let transactions_root = proofs::calculate_transaction_root(&txs);
let state_root = state.state().state_root(post_state)?;
let withdrawals_root = proofs::calculate_withdrawals_root(&attributes.withdrawals);
let state_root = state.state_root(post_state)?;

let header = Header {
parent_hash: config.parent.hash,
parent_hash: attributes.parent,
ommers_hash: EMPTY_OMMER_ROOT,
beneficiary: block_env.coinbase,
state_root,
transactions_root,
receipts_root,
withdrawals_root: Some(withdrawals_root),
logs_bloom,
timestamp: config.attributes.timestamp,
mix_hash: config.attributes.prev_randao,
timestamp: attributes.timestamp,
mix_hash: attributes.prev_randao,
nonce: BEACON_NONCE,
base_fee_per_gas: Some(base_fee),
number: config.parent.number + 1,
number: block_num,
gas_limit: block_gas_limit,
difficulty: U256::ZERO,
gas_used: cumulative_gas_used,
extra_data: config.extra_data.to_le_bytes().into(),
extra_data: extra_data.to_le_bytes().into(),
};

let block = Block {
header,
body: txs,
ommers: vec![],
withdrawals: Some(config.attributes.withdrawals),
withdrawals: Some(attributes.withdrawals.clone()),
};
let block = block.seal_slow();

let payload = BuiltPayload::new(config.attributes.id, block, total_fees);
let payload = Payload {
inner: Arc::new(payload),
bundles: bundle_ids,
};

Ok(payload)
Ok(block.seal_slow())
}