Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: decompose payload build function #28

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 147 additions & 140 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use reth_primitives::{
constants::{BEACON_NONCE, EMPTY_OMMER_ROOT},
proofs, Block, BlockNumber, ChainSpec, Header, Receipt, SealedHeader, TransactionSigned, U256,
};
use reth_provider::{BlockReaderIdExt, CanonStateNotification, PostState, StateProviderFactory};
use reth_provider::{
BlockReaderIdExt, CanonStateNotification, PostState, StateProvider, StateProviderFactory,
};
use reth_revm::{
database::State,
env::tx_env_with_recovered,
Expand Down Expand Up @@ -144,142 +146,6 @@ impl<Client> Job<Client> {
}
}

impl<Client> Job<Client>
where
Client: StateProviderFactory,
{
fn build<I: Iterator<Item = (BundleId, BundleCompact)>>(
config: JobConfig,
client: Arc<Client>,
mut cached_reads: CachedReads,
bundles: I,
) -> Result<Payload, PayloadBuilderError> {
// init state w.r.t. config
let state = State::new(client.state_by_block_hash(config.parent.hash)?);
let mut db = CacheDB::new(cached_reads.as_db(&state));
let mut post_state = PostState::default();

let (cfg_env, block_env) = config
.attributes
.cfg_and_block_env(&config.chain, &config.parent);

let base_fee = block_env.basefee.to::<u64>();
let block_num = block_env.number.to::<u64>();
let block_gas_limit: u64 = block_env.gas_limit.try_into().unwrap_or(u64::MAX);

let mut total_fees = U256::ZERO;
let mut cumulative_gas_used = 0;
let mut txs = Vec::new();
let mut bundle_ids = HashSet::new();

for (id, bundle) in bundles {
// check gas for entire bundle
let bundle_gas_limit: u64 = bundle.0.iter().map(|tx| tx.gas_limit()).sum();
if cumulative_gas_used + bundle_gas_limit > block_gas_limit {
continue;
}

for tx in bundle.0.into_iter() {
let tx = tx.into_ecrecovered().ok_or(PayloadBuilderError::Internal(
RethError::Custom("unable to recover tx signer".into()),
))?;

// construct EVM
let tx_env = tx_env_with_recovered(&tx);
let env = Env {
cfg: cfg_env.clone(),
block: block_env.clone(),
tx: tx_env.clone(),
};
let mut evm = EVM::with_env(env);
evm.database(&mut db);

// NOTE: you can do far more reasonable error handling here. if a transaction
// within a bundle fails, we don't have to fail the entire payload.
let ResultAndState { result, state } = evm
.transact()
.map_err(PayloadBuilderError::EvmExecutionError)?;

commit_state_changes(&mut db, &mut post_state, block_num, state, true);

post_state.add_receipt(
block_num,
Receipt {
tx_type: tx.tx_type(),
success: result.is_success(),
cumulative_gas_used,
logs: result.logs().into_iter().map(into_reth_log).collect(),
},
);

cumulative_gas_used += result.gas_used();

let miner_fee = tx
.effective_tip_per_gas(base_fee)
.ok_or(InvalidTransaction::GasPriceLessThanBasefee)
.map_err(|err| PayloadBuilderError::EvmExecutionError(err.into()))?;
total_fees += U256::from(miner_fee) * U256::from(result.gas_used());

txs.push(tx.into_signed());
bundle_ids.insert(id);
}
}

// NOTE: here we assume post-shanghai
let balance_increments = post_block_withdrawals_balance_increments(
&config.chain,
config.attributes.timestamp,
&config.attributes.withdrawals,
);
for (address, increment) in balance_increments {
increment_account_balance(&mut db, &mut post_state, block_num, address, increment)?;
}
let withdrawals_root = proofs::calculate_withdrawals_root(&config.attributes.withdrawals);

// compute other accumulators
let receipts_root = post_state.receipts_root(block_num);
let logs_bloom = post_state.logs_bloom(block_num);
let transactions_root = proofs::calculate_transaction_root(&txs);
let state_root = state.state().state_root(post_state)?;

let header = Header {
parent_hash: config.parent.hash,
ommers_hash: EMPTY_OMMER_ROOT,
beneficiary: block_env.coinbase,
state_root,
transactions_root,
receipts_root,
withdrawals_root: Some(withdrawals_root),
logs_bloom,
timestamp: config.attributes.timestamp,
mix_hash: config.attributes.prev_randao,
nonce: BEACON_NONCE,
base_fee_per_gas: Some(base_fee),
number: config.parent.number + 1,
gas_limit: block_gas_limit,
difficulty: U256::ZERO,
gas_used: cumulative_gas_used,
extra_data: config.extra_data.to_le_bytes().into(),
};

let block = Block {
header,
body: txs,
ommers: vec![],
withdrawals: Some(config.attributes.withdrawals),
};
let block = block.seal_slow();

let payload = BuiltPayload::new(config.attributes.id, block, total_fees);
let payload = Payload {
inner: Arc::new(payload),
bundles: bundle_ids,
};

Ok(payload)
}
}

impl<Client> Future for Job<Client>
where
Client: StateProviderFactory + 'static,
Expand Down Expand Up @@ -342,7 +208,7 @@ where
let client = Arc::clone(&this.client);
let pending = task::spawn_blocking(move || {
// TODO: come back to this
Job::build(config, client, CachedReads::default(), bundles.into_iter())
build(config, client, CachedReads::default(), bundles.into_iter())
});

this.pending_payloads.push_back(pending);
Expand Down Expand Up @@ -415,7 +281,7 @@ where
return Ok(Arc::clone(&best.inner));
}

let empty = Job::build(
let empty = build(
self.config.clone(),
Arc::clone(&self.client),
CachedReads::default(),
Expand All @@ -434,7 +300,7 @@ where
let client = Arc::clone(&self.client);
let bundles = self.bundles.clone().into_iter();
task::spawn_blocking(move || {
let payload = Job::build(config, client, CachedReads::default(), bundles);
let payload = build(config, client, CachedReads::default(), bundles);
let _ = tx.send(payload);
});

Expand Down Expand Up @@ -593,3 +459,144 @@ where
))
}
}

fn build<Client: StateProviderFactory, I: Iterator<Item = (BundleId, BundleCompact)>>(
config: JobConfig,
client: Arc<Client>,
cached_reads: CachedReads,
bundles: I,
) -> Result<Payload, PayloadBuilderError> {
let state = client.state_by_block_hash(config.parent.hash)?;
let state = State::new(state);
build_on_state(config, state, cached_reads, bundles)
}

fn build_on_state<S: StateProvider, I: Iterator<Item = (BundleId, BundleCompact)>>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this its a good function

config: JobConfig,
state: State<S>,
mut cached_reads: CachedReads,
bundles: I,
) -> Result<Payload, PayloadBuilderError> {
let mut db = CacheDB::new(cached_reads.as_db(&state));
let mut post_state = PostState::default();

let (cfg_env, block_env) = config
.attributes
.cfg_and_block_env(&config.chain, &config.parent);
let base_fee = block_env.basefee.to::<u64>();
let block_num = block_env.number.to::<u64>();
let block_gas_limit: u64 = block_env.gas_limit.try_into().unwrap_or(u64::MAX);

let mut total_fees = U256::ZERO;
let mut cumulative_gas_used = 0;
let mut txs = Vec::new();
let mut bundle_ids = HashSet::new();

for (id, bundle) in bundles {
// check gas for entire bundle
let bundle_gas_limit: u64 = bundle.0.iter().map(|tx| tx.gas_limit()).sum();
if cumulative_gas_used + bundle_gas_limit > block_gas_limit {
continue;
}

for tx in bundle.0.into_iter() {
let tx =
tx.into_ecrecovered()
.ok_or(PayloadBuilderError::Internal(RethError::Custom(
"unable to recover tx signer".into(),
)))?;

// construct EVM
let tx_env = tx_env_with_recovered(&tx);
let env = Env {
cfg: cfg_env.clone(),
block: block_env.clone(),
tx: tx_env.clone(),
};
let mut evm = EVM::with_env(env);
evm.database(&mut db);

// NOTE: you can do far more reasonable error handling here. if a transaction
// within a bundle fails, we don't have to fail the entire payload.
let ResultAndState { result, state } = evm
.transact()
.map_err(PayloadBuilderError::EvmExecutionError)?;

commit_state_changes(&mut db, &mut post_state, block_num, state, true);

post_state.add_receipt(
block_num,
Receipt {
tx_type: tx.tx_type(),
success: result.is_success(),
cumulative_gas_used,
logs: result.logs().into_iter().map(into_reth_log).collect(),
},
);

cumulative_gas_used += result.gas_used();

let miner_fee = tx
.effective_tip_per_gas(base_fee)
.ok_or(InvalidTransaction::GasPriceLessThanBasefee)
.map_err(|err| PayloadBuilderError::EvmExecutionError(err.into()))?;
total_fees += U256::from(miner_fee) * U256::from(result.gas_used());

txs.push(tx.into_signed());
bundle_ids.insert(id);
}
}

// NOTE: here we assume post-shanghai
let balance_increments = post_block_withdrawals_balance_increments(
&config.chain,
config.attributes.timestamp,
&config.attributes.withdrawals,
);
for (address, increment) in balance_increments {
increment_account_balance(&mut db, &mut post_state, block_num, address, increment)?;
}
let withdrawals_root = proofs::calculate_withdrawals_root(&config.attributes.withdrawals);

// compute other accumulators
let receipts_root = post_state.receipts_root(block_num);
let logs_bloom = post_state.logs_bloom(block_num);
let transactions_root = proofs::calculate_transaction_root(&txs);
let state_root = state.state().state_root(post_state)?;

let header = Header {
parent_hash: config.parent.hash,
ommers_hash: EMPTY_OMMER_ROOT,
beneficiary: block_env.coinbase,
state_root,
transactions_root,
receipts_root,
withdrawals_root: Some(withdrawals_root),
logs_bloom,
timestamp: config.attributes.timestamp,
mix_hash: config.attributes.prev_randao,
nonce: BEACON_NONCE,
base_fee_per_gas: Some(base_fee),
number: config.parent.number + 1,
gas_limit: block_gas_limit,
difficulty: U256::ZERO,
gas_used: cumulative_gas_used,
extra_data: config.extra_data.to_le_bytes().into(),
};

let block = Block {
header,
body: txs,
ommers: vec![],
withdrawals: Some(config.attributes.withdrawals),
};
let block = block.seal_slow();

let payload = BuiltPayload::new(config.attributes.id, block, total_fees);
let payload = Payload {
inner: Arc::new(payload),
bundles: bundle_ids,
};

Ok(payload)
}