From 951d5f208e5d16a5d95878dd345a8bd2a4144aa7 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 27 Aug 2024 12:58:16 +0300 Subject: [PATCH] feat(vm): Extract oneshot VM executor interface (#2671) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Extracts oneshot VM executor from the API server crate. ## Why ❔ Simplifies reasoning about oneshot VM execution and its maintenance. Allows for alternative implementations. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- core/lib/multivm/src/tracers/mod.rs | 4 +- core/lib/multivm/src/tracers/validator/mod.rs | 6 +- core/lib/vm_interface/src/lib.rs | 5 +- .../vm_interface/src/types/inputs/l2_block.rs | 19 +- core/lib/vm_interface/src/types/inputs/mod.rs | 14 +- .../api_server/src/execution_sandbox/apply.rs | 676 ++++++++++-------- .../src/execution_sandbox/execute.rs | 221 +++--- .../api_server/src/execution_sandbox/mod.rs | 57 +- .../src/execution_sandbox/testonly.rs | 104 ++- .../api_server/src/execution_sandbox/tests.rs | 58 +- .../src/execution_sandbox/tracers.rs | 41 +- .../src/execution_sandbox/validate.rs | 112 +-- core/node/api_server/src/tx_sender/mod.rs | 106 ++- core/node/api_server/src/tx_sender/tests.rs | 8 +- .../api_server/src/web3/namespaces/debug.rs | 41 +- .../api_server/src/web3/namespaces/eth.rs | 13 +- core/node/api_server/src/web3/testonly.rs | 13 +- core/node/api_server/src/web3/tests/mod.rs | 49 +- core/node/api_server/src/web3/tests/vm.rs | 123 ++-- 19 files changed, 859 insertions(+), 811 deletions(-) diff --git a/core/lib/multivm/src/tracers/mod.rs b/core/lib/multivm/src/tracers/mod.rs index 0a6517a6cd2..69501cf3988 100644 --- a/core/lib/multivm/src/tracers/mod.rs +++ b/core/lib/multivm/src/tracers/mod.rs @@ -3,7 +3,9 @@ pub use self::{ multivm_dispatcher::TracerDispatcher, prestate_tracer::PrestateTracer, storage_invocation::StorageInvocations, - validator::{ValidationError, ValidationTracer, ValidationTracerParams}, + validator::{ + ValidationError, ValidationTracer, ValidationTracerParams, ViolatedValidationRule, + }, }; mod call_tracer; diff --git a/core/lib/multivm/src/tracers/validator/mod.rs b/core/lib/multivm/src/tracers/validator/mod.rs index a91006368b6..307256792cf 100644 --- a/core/lib/multivm/src/tracers/validator/mod.rs +++ b/core/lib/multivm/src/tracers/validator/mod.rs @@ -10,13 +10,11 @@ use zksync_types::{ }; use zksync_utils::{be_bytes_to_safe_address, u256_to_account_address, u256_to_h256}; -pub use crate::tracers::validator::types::{ValidationError, ValidationTracerParams}; +use self::types::{NewTrustedValidationItems, ValidationTracerMode}; +pub use self::types::{ValidationError, ValidationTracerParams, ViolatedValidationRule}; use crate::{ glue::tracers::IntoOldVmTracer, interface::storage::{StoragePtr, WriteStorage}, - tracers::validator::types::{ - NewTrustedValidationItems, ValidationTracerMode, ViolatedValidationRule, - }, }; mod types; diff --git a/core/lib/vm_interface/src/lib.rs b/core/lib/vm_interface/src/lib.rs index b2b7d6484da..120812842ad 100644 --- a/core/lib/vm_interface/src/lib.rs +++ b/core/lib/vm_interface/src/lib.rs @@ -23,7 +23,10 @@ pub use crate::{ BytecodeCompressionError, Halt, TxRevertReason, VmRevertReason, VmRevertReasonParsingError, }, - inputs::{L1BatchEnv, L2BlockEnv, SystemEnv, TxExecutionMode, VmExecutionMode}, + inputs::{ + L1BatchEnv, L2BlockEnv, OneshotEnv, StoredL2BlockEnv, SystemEnv, TxExecutionMode, + VmExecutionMode, + }, outputs::{ BootloaderMemory, Call, CallType, CircuitStatistic, CompressedBytecodeInfo, CurrentExecutionState, DeduplicatedWritesMetrics, ExecutionResult, FinishedL1Batch, diff --git a/core/lib/vm_interface/src/types/inputs/l2_block.rs b/core/lib/vm_interface/src/types/inputs/l2_block.rs index 7c9a028bbad..b081dfbdeac 100644 --- a/core/lib/vm_interface/src/types/inputs/l2_block.rs +++ b/core/lib/vm_interface/src/types/inputs/l2_block.rs @@ -10,12 +10,21 @@ pub struct L2BlockEnv { } impl L2BlockEnv { - pub fn from_l2_block_data(miniblock_execution_data: &L2BlockExecutionData) -> Self { + pub fn from_l2_block_data(execution_data: &L2BlockExecutionData) -> Self { Self { - number: miniblock_execution_data.number.0, - timestamp: miniblock_execution_data.timestamp, - prev_block_hash: miniblock_execution_data.prev_block_hash, - max_virtual_blocks_to_create: miniblock_execution_data.virtual_blocks, + number: execution_data.number.0, + timestamp: execution_data.timestamp, + prev_block_hash: execution_data.prev_block_hash, + max_virtual_blocks_to_create: execution_data.virtual_blocks, } } } + +/// Current block information stored in the system context contract. Can be used to set up +/// oneshot transaction / call execution. +#[derive(Debug, Clone, Copy)] +pub struct StoredL2BlockEnv { + pub number: u32, + pub timestamp: u64, + pub txs_rolling_hash: H256, +} diff --git a/core/lib/vm_interface/src/types/inputs/mod.rs b/core/lib/vm_interface/src/types/inputs/mod.rs index 1d2c49cdfa1..4801c4d88b5 100644 --- a/core/lib/vm_interface/src/types/inputs/mod.rs +++ b/core/lib/vm_interface/src/types/inputs/mod.rs @@ -1,7 +1,7 @@ pub use self::{ execution_mode::VmExecutionMode, l1_batch_env::L1BatchEnv, - l2_block::L2BlockEnv, + l2_block::{L2BlockEnv, StoredL2BlockEnv}, system_env::{SystemEnv, TxExecutionMode}, }; @@ -9,3 +9,15 @@ mod execution_mode; mod l1_batch_env; mod l2_block; mod system_env; + +/// Full environment for oneshot transaction / call execution. +#[derive(Debug)] +pub struct OneshotEnv { + /// System environment. + pub system: SystemEnv, + /// Part of the environment specific to an L1 batch. + pub l1_batch: L1BatchEnv, + /// Part of the environment representing the current L2 block. Can be used to override storage slots + /// in the system context contract, which are set from `L1BatchEnv.first_l2_block` by default. + pub current_block: Option, +} diff --git a/core/node/api_server/src/execution_sandbox/apply.rs b/core/node/api_server/src/execution_sandbox/apply.rs index c0c8398f690..0ec857e1e2b 100644 --- a/core/node/api_server/src/execution_sandbox/apply.rs +++ b/core/node/api_server/src/execution_sandbox/apply.rs @@ -9,16 +9,19 @@ use std::time::{Duration, Instant}; use anyhow::Context as _; +use async_trait::async_trait; use tokio::runtime::Handle; -use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError}; +use zksync_dal::{Connection, Core, CoreDal, DalError}; use zksync_multivm::{ interface::{ storage::{ReadStorage, StoragePtr, StorageView, WriteStorage}, - L1BatchEnv, L2BlockEnv, SystemEnv, VmInterface, + BytecodeCompressionError, L1BatchEnv, L2BlockEnv, OneshotEnv, StoredL2BlockEnv, SystemEnv, + TxExecutionMode, VmExecutionMode, VmExecutionResultAndLogs, VmInterface, }, - utils::adjust_pubdata_price_for_tx, + tracers::StorageInvocations, + utils::{adjust_pubdata_price_for_tx, get_eth_call_gas_limit}, vm_latest::{constants::BATCH_COMPUTATIONAL_GAS_LIMIT, HistoryDisabled}, - VmInstance, + MultiVMTracer, MultiVmTracerPointer, VmInstance, }; use zksync_state::PostgresStorage; use zksync_system_constants::{ @@ -26,7 +29,7 @@ use zksync_system_constants::{ SYSTEM_CONTEXT_CURRENT_TX_ROLLING_HASH_POSITION, ZKPORTER_IS_AVAILABLE, }; use zksync_types::{ - api::{self, state_override::StateOverride}, + api, block::{pack_block_info, unpack_block_info, L2BlockHasher}, fee_model::BatchFeeInput, get_nonce_key, @@ -37,179 +40,250 @@ use zksync_types::{ use zksync_utils::{h256_to_u256, time::seconds_since_epoch, u256_to_h256}; use super::{ - storage::StorageWithOverrides, vm_metrics::{self, SandboxStage, SANDBOX_METRICS}, - BlockArgs, TxExecutionArgs, TxSharedArgs, VmPermit, + ApiTracer, BlockArgs, OneshotExecutor, TxExecutionArgs, TxSetupArgs, }; -type VmStorageView<'a> = StorageView>>; -type BoxedVm<'a> = Box>, HistoryDisabled>>; +pub(super) async fn prepare_env_and_storage( + mut connection: Connection<'static, Core>, + setup_args: TxSetupArgs, + block_args: &BlockArgs, +) -> anyhow::Result<(OneshotEnv, PostgresStorage<'static>)> { + let initialization_stage = SANDBOX_METRICS.sandbox[&SandboxStage::Initialization].start(); -#[derive(Debug)] -struct Sandbox<'a> { - system_env: SystemEnv, - l1_batch_env: L1BatchEnv, - execution_args: &'a TxExecutionArgs, - l2_block_info_to_reset: Option, - storage_view: VmStorageView<'a>, -} - -impl<'a> Sandbox<'a> { - async fn new( - mut connection: Connection<'a, Core>, - shared_args: TxSharedArgs, - execution_args: &'a TxExecutionArgs, - block_args: BlockArgs, - state_override: &StateOverride, - ) -> anyhow::Result> { - let resolve_started_at = Instant::now(); - let resolved_block_info = block_args - .resolve_block_info(&mut connection) - .await - .with_context(|| format!("cannot resolve block numbers for {block_args:?}"))?; - let resolve_time = resolve_started_at.elapsed(); - // We don't want to emit too many logs. - if resolve_time > Duration::from_millis(10) { - tracing::debug!("Resolved block numbers (took {resolve_time:?})"); - } - - if block_args.resolves_to_latest_sealed_l2_block() { - shared_args - .caches - .schedule_values_update(resolved_block_info.state_l2_block_number); - } - - let (next_l2_block_info, l2_block_info_to_reset) = Self::load_l2_block_info( - &mut connection, - block_args.is_pending_l2_block(), - &resolved_block_info, - ) - .await?; - - let storage = PostgresStorage::new_async( - Handle::current(), - connection, - resolved_block_info.state_l2_block_number, - false, - ) + let resolve_started_at = Instant::now(); + let resolved_block_info = block_args + .resolve_block_info(&mut connection) .await - .context("cannot create `PostgresStorage`")? - .with_caches(shared_args.caches.clone()); - - let storage_with_overrides = StorageWithOverrides::new(storage, state_override); - let storage_view = StorageView::new(storage_with_overrides); - let (system_env, l1_batch_env) = Self::prepare_env( - shared_args, - execution_args, - &resolved_block_info, - next_l2_block_info, - ); + .with_context(|| format!("cannot resolve block numbers for {block_args:?}"))?; + let resolve_time = resolve_started_at.elapsed(); + // We don't want to emit too many logs. + if resolve_time > Duration::from_millis(10) { + tracing::debug!("Resolved block numbers (took {resolve_time:?})"); + } - Ok(Self { - system_env, - l1_batch_env, - storage_view, - execution_args, - l2_block_info_to_reset, - }) + if block_args.resolves_to_latest_sealed_l2_block() { + setup_args + .caches + .schedule_values_update(resolved_block_info.state_l2_block_number); } - async fn load_l2_block_info( - connection: &mut Connection<'_, Core>, - is_pending_block: bool, - resolved_block_info: &ResolvedBlockInfo, - ) -> anyhow::Result<(L2BlockEnv, Option)> { - let mut l2_block_info_to_reset = None; - let current_l2_block_info = StoredL2BlockInfo::new( - connection, - resolved_block_info.state_l2_block_number, - Some(resolved_block_info.state_l2_block_hash), - ) + let (next_block, current_block) = load_l2_block_info( + &mut connection, + block_args.is_pending_l2_block(), + &resolved_block_info, + ) + .await?; + + let storage = PostgresStorage::new_async( + Handle::current(), + connection, + resolved_block_info.state_l2_block_number, + false, + ) + .await + .context("cannot create `PostgresStorage`")? + .with_caches(setup_args.caches.clone()); + + let (system, l1_batch) = prepare_env(setup_args, &resolved_block_info, next_block); + + let env = OneshotEnv { + system, + l1_batch, + current_block, + }; + initialization_stage.observe(); + Ok((env, storage)) +} + +async fn load_l2_block_info( + connection: &mut Connection<'_, Core>, + is_pending_block: bool, + resolved_block_info: &ResolvedBlockInfo, +) -> anyhow::Result<(L2BlockEnv, Option)> { + let mut current_block = None; + let next_block = read_stored_l2_block(connection, resolved_block_info.state_l2_block_number) .await .context("failed reading L2 block info")?; - let next_l2_block_info = if is_pending_block { - L2BlockEnv { - number: current_l2_block_info.l2_block_number + 1, - timestamp: resolved_block_info.l1_batch_timestamp, - prev_block_hash: current_l2_block_info.l2_block_hash, - // For simplicity, we assume each L2 block create one virtual block. - // This may be wrong only during transition period. - max_virtual_blocks_to_create: 1, - } - } else if current_l2_block_info.l2_block_number == 0 { - // Special case: - // - For environments, where genesis block was created before virtual block upgrade it doesn't matter what we put here. - // - Otherwise, we need to put actual values here. We cannot create next L2 block with block_number=0 and `max_virtual_blocks_to_create=0` - // because of SystemContext requirements. But, due to intrinsics of SystemContext, block.number still will be resolved to 0. - L2BlockEnv { - number: 1, - timestamp: 0, - prev_block_hash: L2BlockHasher::legacy_hash(L2BlockNumber(0)), - max_virtual_blocks_to_create: 1, - } - } else { - // We need to reset L2 block info in storage to process transaction in the current block context. - // Actual resetting will be done after `storage_view` is created. - let prev_l2_block_info = StoredL2BlockInfo::new( - connection, - resolved_block_info.state_l2_block_number - 1, - None, - ) + let next_block = if is_pending_block { + L2BlockEnv { + number: next_block.number + 1, + timestamp: resolved_block_info.l1_batch_timestamp, + prev_block_hash: resolved_block_info.state_l2_block_hash, + // For simplicity, we assume each L2 block create one virtual block. + // This may be wrong only during transition period. + max_virtual_blocks_to_create: 1, + } + } else if next_block.number == 0 { + // Special case: + // - For environments, where genesis block was created before virtual block upgrade it doesn't matter what we put here. + // - Otherwise, we need to put actual values here. We cannot create next L2 block with block_number=0 and `max_virtual_blocks_to_create=0` + // because of SystemContext requirements. But, due to intrinsics of SystemContext, block.number still will be resolved to 0. + L2BlockEnv { + number: 1, + timestamp: 0, + prev_block_hash: L2BlockHasher::legacy_hash(L2BlockNumber(0)), + max_virtual_blocks_to_create: 1, + } + } else { + // We need to reset L2 block info in storage to process transaction in the current block context. + // Actual resetting will be done after `storage_view` is created. + let prev_block_number = resolved_block_info.state_l2_block_number - 1; + let prev_l2_block = read_stored_l2_block(connection, prev_block_number) .await .context("failed reading previous L2 block info")?; - l2_block_info_to_reset = Some(prev_l2_block_info); - L2BlockEnv { - number: current_l2_block_info.l2_block_number, - timestamp: current_l2_block_info.l2_block_timestamp, - prev_block_hash: prev_l2_block_info.l2_block_hash, - max_virtual_blocks_to_create: 1, - } + let mut prev_block_hash = connection + .blocks_web3_dal() + .get_l2_block_hash(prev_block_number) + .await + .map_err(DalError::generalize)?; + if prev_block_hash.is_none() { + // We might need to load the previous block hash from the snapshot recovery metadata + let snapshot_recovery = connection + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await + .map_err(DalError::generalize)?; + prev_block_hash = snapshot_recovery.and_then(|recovery| { + (recovery.l2_block_number == prev_block_number).then_some(recovery.l2_block_hash) + }); + } + + current_block = Some(prev_l2_block); + L2BlockEnv { + number: next_block.number, + timestamp: next_block.timestamp, + prev_block_hash: prev_block_hash.with_context(|| { + format!("missing hash for previous L2 block #{prev_block_number}") + })?, + max_virtual_blocks_to_create: 1, + } + }; + + Ok((next_block, current_block)) +} + +fn prepare_env( + setup_args: TxSetupArgs, + resolved_block_info: &ResolvedBlockInfo, + next_block: L2BlockEnv, +) -> (SystemEnv, L1BatchEnv) { + let TxSetupArgs { + execution_mode, + operator_account, + fee_input, + base_system_contracts, + validation_computational_gas_limit, + chain_id, + enforced_base_fee, + .. + } = setup_args; + + // In case we are executing in a past block, we'll use the historical fee data. + let fee_input = resolved_block_info + .historical_fee_input + .unwrap_or(fee_input); + let system_env = SystemEnv { + zk_porter_available: ZKPORTER_IS_AVAILABLE, + version: resolved_block_info.protocol_version, + base_system_smart_contracts: base_system_contracts + .get_by_protocol_version(resolved_block_info.protocol_version), + bootloader_gas_limit: BATCH_COMPUTATIONAL_GAS_LIMIT, + execution_mode, + default_validation_computational_gas_limit: validation_computational_gas_limit, + chain_id, + }; + let l1_batch_env = L1BatchEnv { + previous_batch_hash: None, + number: resolved_block_info.vm_l1_batch_number, + timestamp: resolved_block_info.l1_batch_timestamp, + fee_input, + fee_account: *operator_account.address(), + enforced_base_fee, + first_l2_block: next_block, + }; + (system_env, l1_batch_env) +} + +// public for testing purposes +#[derive(Debug)] +pub(super) struct VmSandbox { + vm: Box>, + storage_view: StoragePtr>, + transaction: Transaction, +} + +impl VmSandbox { + /// This method is blocking. + pub fn new(storage: S, mut env: OneshotEnv, execution_args: TxExecutionArgs) -> Self { + let mut storage_view = StorageView::new(storage); + Self::setup_storage_view(&mut storage_view, &execution_args, env.current_block); + + let protocol_version = env.system.version; + if execution_args.adjust_pubdata_price { + env.l1_batch.fee_input = adjust_pubdata_price_for_tx( + env.l1_batch.fee_input, + execution_args.transaction.gas_per_pubdata_byte_limit(), + env.l1_batch.enforced_base_fee.map(U256::from), + protocol_version.into(), + ); }; - Ok((next_l2_block_info, l2_block_info_to_reset)) + let storage_view = storage_view.to_rc_ptr(); + let vm = Box::new(VmInstance::new_with_specific_version( + env.l1_batch, + env.system, + storage_view.clone(), + protocol_version.into_api_vm_version(), + )); + + Self { + vm, + storage_view, + transaction: execution_args.transaction, + } } /// This method is blocking. - fn setup_storage_view(&mut self, tx: &Transaction) { + fn setup_storage_view( + storage_view: &mut StorageView, + execution_args: &TxExecutionArgs, + current_block: Option, + ) { let storage_view_setup_started_at = Instant::now(); - if let Some(nonce) = self.execution_args.enforced_nonce { - let nonce_key = get_nonce_key(&tx.initiator_account()); - let full_nonce = self.storage_view.read_value(&nonce_key); + if let Some(nonce) = execution_args.enforced_nonce { + let nonce_key = get_nonce_key(&execution_args.transaction.initiator_account()); + let full_nonce = storage_view.read_value(&nonce_key); let (_, deployment_nonce) = decompose_full_nonce(h256_to_u256(full_nonce)); let enforced_full_nonce = nonces_to_full_nonce(U256::from(nonce.0), deployment_nonce); - self.storage_view - .set_value(nonce_key, u256_to_h256(enforced_full_nonce)); + storage_view.set_value(nonce_key, u256_to_h256(enforced_full_nonce)); } - let payer = tx.payer(); + let payer = execution_args.transaction.payer(); let balance_key = storage_key_for_eth_balance(&payer); - let mut current_balance = h256_to_u256(self.storage_view.read_value(&balance_key)); - current_balance += self.execution_args.added_balance; - self.storage_view - .set_value(balance_key, u256_to_h256(current_balance)); + let mut current_balance = h256_to_u256(storage_view.read_value(&balance_key)); + current_balance += execution_args.added_balance; + storage_view.set_value(balance_key, u256_to_h256(current_balance)); // Reset L2 block info if necessary. - if let Some(l2_block_info_to_reset) = self.l2_block_info_to_reset { + if let Some(current_block) = current_block { let l2_block_info_key = StorageKey::new( AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, ); - let l2_block_info = pack_block_info( - l2_block_info_to_reset.l2_block_number as u64, - l2_block_info_to_reset.l2_block_timestamp, - ); - self.storage_view - .set_value(l2_block_info_key, u256_to_h256(l2_block_info)); + let l2_block_info = + pack_block_info(current_block.number.into(), current_block.timestamp); + storage_view.set_value(l2_block_info_key, u256_to_h256(l2_block_info)); let l2_block_txs_rolling_hash_key = StorageKey::new( AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), SYSTEM_CONTEXT_CURRENT_TX_ROLLING_HASH_POSITION, ); - self.storage_view.set_value( + storage_view.set_value( l2_block_txs_rolling_hash_key, - l2_block_info_to_reset.txs_rolling_hash, + current_block.txs_rolling_hash, ); } @@ -220,201 +294,155 @@ impl<'a> Sandbox<'a> { } } - fn prepare_env( - shared_args: TxSharedArgs, - execution_args: &TxExecutionArgs, - resolved_block_info: &ResolvedBlockInfo, - next_l2_block_info: L2BlockEnv, - ) -> (SystemEnv, L1BatchEnv) { - let TxSharedArgs { - operator_account, - fee_input, - base_system_contracts, - validation_computational_gas_limit, - chain_id, - .. - } = shared_args; - - // In case we are executing in a past block, we'll use the historical fee data. - let fee_input = resolved_block_info - .historical_fee_input - .unwrap_or(fee_input); - let system_env = SystemEnv { - zk_porter_available: ZKPORTER_IS_AVAILABLE, - version: resolved_block_info.protocol_version, - base_system_smart_contracts: base_system_contracts - .get_by_protocol_version(resolved_block_info.protocol_version), - bootloader_gas_limit: BATCH_COMPUTATIONAL_GAS_LIMIT, - execution_mode: execution_args.execution_mode, - default_validation_computational_gas_limit: validation_computational_gas_limit, - chain_id, - }; - let l1_batch_env = L1BatchEnv { - previous_batch_hash: None, - number: resolved_block_info.vm_l1_batch_number, - timestamp: resolved_block_info.l1_batch_timestamp, - fee_input, - fee_account: *operator_account.address(), - enforced_base_fee: execution_args.enforced_base_fee, - first_l2_block: next_l2_block_info, - }; - (system_env, l1_batch_env) + fn wrap_tracers( + tracers: Vec, + env: &OneshotEnv, + missed_storage_invocation_limit: usize, + ) -> Vec, HistoryDisabled>> { + let storage_invocation_tracer = StorageInvocations::new(missed_storage_invocation_limit); + let protocol_version = env.system.version; + tracers + .into_iter() + .map(|tracer| tracer.into_boxed(protocol_version)) + .chain([storage_invocation_tracer.into_tracer_pointer()]) + .collect() } - /// This method is blocking. - fn into_vm( - mut self, - tx: &Transaction, - adjust_pubdata_price: bool, - ) -> (BoxedVm<'a>, StoragePtr>) { - self.setup_storage_view(tx); - let protocol_version = self.system_env.version; - if adjust_pubdata_price { - self.l1_batch_env.fee_input = adjust_pubdata_price_for_tx( - self.l1_batch_env.fee_input, - tx.gas_per_pubdata_byte_limit(), - self.l1_batch_env.enforced_base_fee.map(U256::from), - protocol_version.into(), - ); - }; + pub(super) fn apply(mut self, apply_fn: F) -> T + where + F: FnOnce(&mut VmInstance, Transaction) -> T, + { + let tx_id = format!( + "{:?}-{}", + self.transaction.initiator_account(), + self.transaction.nonce().unwrap_or(Nonce(0)) + ); - let storage_view = self.storage_view.to_rc_ptr(); - let vm = Box::new(VmInstance::new_with_specific_version( - self.l1_batch_env, - self.system_env, - storage_view.clone(), - protocol_version.into_api_vm_version(), - )); + let execution_latency = SANDBOX_METRICS.sandbox[&SandboxStage::Execution].start(); + let result = apply_fn(&mut *self.vm, self.transaction); + let vm_execution_took = execution_latency.observe(); - (vm, storage_view) + let memory_metrics = self.vm.record_vm_memory_metrics(); + vm_metrics::report_vm_memory_metrics( + &tx_id, + &memory_metrics, + vm_execution_took, + self.storage_view.as_ref().borrow_mut().metrics(), + ); + result } } -#[allow(clippy::too_many_arguments)] -pub(super) fn apply_vm_in_sandbox( - vm_permit: VmPermit, - shared_args: TxSharedArgs, - // If `true`, then the batch's L1/pubdata gas price will be adjusted so that the transaction's gas per pubdata limit is <= - // to the one in the block. This is often helpful in case we want the transaction validation to work regardless of the - // current L1 prices for gas or pubdata. - adjust_pubdata_price: bool, - execution_args: &TxExecutionArgs, - connection_pool: &ConnectionPool, - tx: Transaction, - block_args: BlockArgs, // Block arguments for the transaction. - state_override: Option, - apply: impl FnOnce( - &mut VmInstance>, HistoryDisabled>, - Transaction, - ProtocolVersionId, - ) -> T, -) -> anyhow::Result { - let stage_started_at = Instant::now(); - let span = tracing::debug_span!("initialization").entered(); - - let rt_handle = vm_permit.rt_handle(); - let connection = rt_handle - .block_on(connection_pool.connection_tagged("api")) - .context("failed acquiring DB connection")?; - let connection_acquire_time = stage_started_at.elapsed(); - // We don't want to emit too many logs. - if connection_acquire_time > Duration::from_millis(10) { - tracing::debug!("Obtained connection (took {connection_acquire_time:?})"); - } - - let sandbox = rt_handle.block_on(Sandbox::new( - connection, - shared_args, - execution_args, - block_args, - state_override.as_ref().unwrap_or(&StateOverride::default()), - ))?; - let protocol_version = sandbox.system_env.version; - let (mut vm, storage_view) = sandbox.into_vm(&tx, adjust_pubdata_price); - - SANDBOX_METRICS.sandbox[&SandboxStage::Initialization].observe(stage_started_at.elapsed()); - span.exit(); - - let tx_id = format!( - "{:?}-{}", - tx.initiator_account(), - tx.nonce().unwrap_or(Nonce(0)) - ); - - let execution_latency = SANDBOX_METRICS.sandbox[&SandboxStage::Execution].start(); - let result = apply(&mut vm, tx, protocol_version); - let vm_execution_took = execution_latency.observe(); - - let memory_metrics = vm.record_vm_memory_metrics(); - vm_metrics::report_vm_memory_metrics( - &tx_id, - &memory_metrics, - vm_execution_took, - storage_view.as_ref().borrow_mut().metrics(), - ); - Ok(result) +/// Main [`OneshotExecutor`] implementation used by the API server. +#[derive(Debug, Default)] +pub struct MainOneshotExecutor { + missed_storage_invocation_limit: usize, } -#[derive(Debug, Clone, Copy)] -struct StoredL2BlockInfo { - l2_block_number: u32, - l2_block_timestamp: u64, - l2_block_hash: H256, - txs_rolling_hash: H256, +impl MainOneshotExecutor { + /// Creates a new executor with the specified limit of cache misses for storage read operations (an anti-DoS measure). + /// The limit is applied for calls and gas estimations, but not during transaction validation. + pub fn new(missed_storage_invocation_limit: usize) -> Self { + Self { + missed_storage_invocation_limit, + } + } } -impl StoredL2BlockInfo { - /// If `l2_block_hash` is `None`, it needs to be fetched from the storage. - async fn new( - connection: &mut Connection<'_, Core>, - l2_block_number: L2BlockNumber, - l2_block_hash: Option, - ) -> anyhow::Result { - let l2_block_info_key = StorageKey::new( - AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), - SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, - ); - let l2_block_info = connection - .storage_web3_dal() - .get_historical_value_unchecked(l2_block_info_key.hashed_key(), l2_block_number) - .await - .context("failed reading L2 block info from VM state")?; - let (l2_block_number_from_state, l2_block_timestamp) = - unpack_block_info(h256_to_u256(l2_block_info)); +#[async_trait] +impl OneshotExecutor for MainOneshotExecutor +where + S: ReadStorage + Send + 'static, +{ + type Tracers = Vec; - let l2_block_txs_rolling_hash_key = StorageKey::new( - AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), - SYSTEM_CONTEXT_CURRENT_TX_ROLLING_HASH_POSITION, - ); - let txs_rolling_hash = connection - .storage_web3_dal() - .get_historical_value_unchecked( - l2_block_txs_rolling_hash_key.hashed_key(), - l2_block_number, - ) - .await - .context("failed reading transaction rolling hash from VM state")?; + async fn inspect_transaction( + &self, + storage: S, + env: OneshotEnv, + args: TxExecutionArgs, + tracers: Self::Tracers, + ) -> anyhow::Result { + let missed_storage_invocation_limit = match env.system.execution_mode { + // storage accesses are not limited for tx validation + TxExecutionMode::VerifyExecute => usize::MAX, + TxExecutionMode::EthCall | TxExecutionMode::EstimateFee => { + self.missed_storage_invocation_limit + } + }; - let l2_block_hash = if let Some(hash) = l2_block_hash { - hash - } else { - connection - .blocks_web3_dal() - .get_l2_block_hash(l2_block_number) - .await - .map_err(DalError::generalize)? - .with_context(|| format!("L2 block #{l2_block_number} not present in storage"))? + tokio::task::spawn_blocking(move || { + let tracers = VmSandbox::wrap_tracers(tracers, &env, missed_storage_invocation_limit); + let executor = VmSandbox::new(storage, env, args); + executor.apply(|vm, transaction| { + vm.push_transaction(transaction); + vm.inspect(tracers.into(), VmExecutionMode::OneTx) + }) + }) + .await + .context("VM execution panicked") + } + + async fn inspect_transaction_with_bytecode_compression( + &self, + storage: S, + env: OneshotEnv, + args: TxExecutionArgs, + tracers: Self::Tracers, + ) -> anyhow::Result<( + Result<(), BytecodeCompressionError>, + VmExecutionResultAndLogs, + )> { + let missed_storage_invocation_limit = match env.system.execution_mode { + // storage accesses are not limited for tx validation + TxExecutionMode::VerifyExecute => usize::MAX, + TxExecutionMode::EthCall | TxExecutionMode::EstimateFee => { + self.missed_storage_invocation_limit + } }; - Ok(Self { - l2_block_number: l2_block_number_from_state as u32, - l2_block_timestamp, - l2_block_hash, - txs_rolling_hash, + tokio::task::spawn_blocking(move || { + let tracers = VmSandbox::wrap_tracers(tracers, &env, missed_storage_invocation_limit); + let executor = VmSandbox::new(storage, env, args); + executor.apply(|vm, transaction| { + vm.inspect_transaction_with_bytecode_compression(tracers.into(), transaction, true) + }) }) + .await + .context("VM execution panicked") } } +async fn read_stored_l2_block( + connection: &mut Connection<'_, Core>, + l2_block_number: L2BlockNumber, +) -> anyhow::Result { + let l2_block_info_key = StorageKey::new( + AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), + SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, + ); + let l2_block_info = connection + .storage_web3_dal() + .get_historical_value_unchecked(l2_block_info_key.hashed_key(), l2_block_number) + .await?; + let (l2_block_number_from_state, timestamp) = unpack_block_info(h256_to_u256(l2_block_info)); + + let l2_block_txs_rolling_hash_key = StorageKey::new( + AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), + SYSTEM_CONTEXT_CURRENT_TX_ROLLING_HASH_POSITION, + ); + let txs_rolling_hash = connection + .storage_web3_dal() + .get_historical_value_unchecked(l2_block_txs_rolling_hash_key.hashed_key(), l2_block_number) + .await?; + + Ok(StoredL2BlockEnv { + number: l2_block_number_from_state as u32, + timestamp, + txs_rolling_hash, + }) +} + #[derive(Debug)] pub(crate) struct ResolvedBlockInfo { state_l2_block_number: L2BlockNumber, @@ -442,7 +470,19 @@ impl BlockArgs { ) } - pub(crate) async fn resolve_block_info( + pub(crate) async fn default_eth_call_gas( + &self, + connection: &mut Connection<'_, Core>, + ) -> anyhow::Result { + let protocol_version = self + .resolve_block_info(connection) + .await + .context("failed to resolve block info")? + .protocol_version; + Ok(get_eth_call_gas_limit(protocol_version.into()).into()) + } + + async fn resolve_block_info( &self, connection: &mut Connection<'_, Core>, ) -> anyhow::Result { diff --git a/core/node/api_server/src/execution_sandbox/execute.rs b/core/node/api_server/src/execution_sandbox/execute.rs index 741bcaea18f..086a75c81de 100644 --- a/core/node/api_server/src/execution_sandbox/execute.rs +++ b/core/node/api_server/src/execution_sandbox/execute.rs @@ -1,80 +1,80 @@ //! Implementation of "executing" methods, e.g. `eth_call`. -use anyhow::Context as _; -use tracing::{span, Level}; -use zksync_dal::{ConnectionPool, Core}; -use zksync_multivm::{ - interface::{ - TransactionExecutionMetrics, TxExecutionMode, VmExecutionResultAndLogs, VmInterface, - }, - tracers::StorageInvocations, - MultiVMTracer, +use async_trait::async_trait; +use zksync_dal::{Connection, Core}; +use zksync_multivm::interface::{ + storage::ReadStorage, BytecodeCompressionError, OneshotEnv, TransactionExecutionMetrics, + VmExecutionResultAndLogs, }; use zksync_types::{ - l2::L2Tx, transaction_request::CallOverrides, ExecuteTransactionCommon, Nonce, + api::state_override::StateOverride, l2::L2Tx, ExecuteTransactionCommon, Nonce, PackedEthSignature, Transaction, U256, }; use super::{ - apply, testonly::MockTransactionExecutor, vm_metrics, ApiTracer, BlockArgs, TxSharedArgs, - VmPermit, + apply::{self, MainOneshotExecutor}, + storage::StorageWithOverrides, + testonly::MockOneshotExecutor, + vm_metrics, ApiTracer, BlockArgs, OneshotExecutor, TxSetupArgs, VmPermit, }; -use crate::execution_sandbox::api::state_override::StateOverride; +/// Executor-independent arguments necessary to for oneshot transaction execution. +/// +/// # Developer guidelines +/// +/// Please don't add fields that duplicate `SystemEnv` or `L1BatchEnv` information, since both of these +/// are also provided to an executor. #[derive(Debug)] pub(crate) struct TxExecutionArgs { - pub execution_mode: TxExecutionMode, + /// Transaction / call itself. + pub transaction: Transaction, + /// Nonce override for the initiator account. pub enforced_nonce: Option, + /// Balance added to the initiator account. pub added_balance: U256, - pub enforced_base_fee: Option, - pub missed_storage_invocation_limit: usize, + /// If `true`, then the batch's L1 / pubdata gas price will be adjusted so that the transaction's gas per pubdata limit is <= + /// to the one in the block. This is often helpful in case we want the transaction validation to work regardless of the + /// current L1 prices for gas or pubdata. + pub adjust_pubdata_price: bool, } impl TxExecutionArgs { - pub fn for_validation(tx: &L2Tx) -> Self { + pub fn for_validation(tx: L2Tx) -> Self { Self { - execution_mode: TxExecutionMode::VerifyExecute, enforced_nonce: Some(tx.nonce()), added_balance: U256::zero(), - enforced_base_fee: Some(tx.common_data.fee.max_fee_per_gas.as_u64()), - missed_storage_invocation_limit: usize::MAX, + adjust_pubdata_price: true, + transaction: tx.into(), } } - fn for_eth_call( - enforced_base_fee: Option, - vm_execution_cache_misses_limit: Option, - ) -> Self { - let missed_storage_invocation_limit = vm_execution_cache_misses_limit.unwrap_or(usize::MAX); + pub fn for_eth_call(mut call: L2Tx) -> Self { + if call.common_data.signature.is_empty() { + call.common_data.signature = PackedEthSignature::default().serialize_packed().into(); + } + Self { - execution_mode: TxExecutionMode::EthCall, enforced_nonce: None, added_balance: U256::zero(), - enforced_base_fee, - missed_storage_invocation_limit, + adjust_pubdata_price: false, + transaction: call.into(), } } - pub fn for_gas_estimate( - vm_execution_cache_misses_limit: Option, - tx: &Transaction, - base_fee: u64, - ) -> Self { - let missed_storage_invocation_limit = vm_execution_cache_misses_limit.unwrap_or(usize::MAX); + pub fn for_gas_estimate(transaction: Transaction) -> Self { // For L2 transactions we need to explicitly put enough balance into the account of the users // while for L1->L2 transactions the `to_mint` field plays this role - let added_balance = match &tx.common_data { + let added_balance = match &transaction.common_data { ExecuteTransactionCommon::L2(data) => data.fee.gas_limit * data.fee.max_fee_per_gas, ExecuteTransactionCommon::L1(_) => U256::zero(), ExecuteTransactionCommon::ProtocolUpgrade(_) => U256::zero(), }; Self { - execution_mode: TxExecutionMode::EstimateFee, - missed_storage_invocation_limit, - enforced_nonce: tx.nonce(), + enforced_nonce: transaction.nonce(), added_balance, - enforced_base_fee: Some(base_fee), + adjust_pubdata_price: true, + transaction, } } } @@ -92,68 +92,40 @@ pub(crate) struct TransactionExecutionOutput { /// Executor of transactions. #[derive(Debug)] pub(crate) enum TransactionExecutor { - Real, + Real(MainOneshotExecutor), #[doc(hidden)] // Intended for tests only - Mock(MockTransactionExecutor), + Mock(MockOneshotExecutor), } impl TransactionExecutor { + pub fn real(missed_storage_invocation_limit: usize) -> Self { + Self::Real(MainOneshotExecutor::new(missed_storage_invocation_limit)) + } + /// This method assumes that (block with number `resolved_block_number` is present in DB) /// or (`block_id` is `pending` and block with number `resolved_block_number - 1` is present in DB) #[allow(clippy::too_many_arguments)] - #[tracing::instrument(skip_all)] + #[tracing::instrument(level = "debug", skip_all)] pub async fn execute_tx_in_sandbox( &self, vm_permit: VmPermit, - shared_args: TxSharedArgs, - // If `true`, then the batch's L1/pubdata gas price will be adjusted so that the transaction's gas per pubdata limit is <= - // to the one in the block. This is often helpful in case we want the transaction validation to work regardless of the - // current L1 prices for gas or pubdata. - adjust_pubdata_price: bool, + setup_args: TxSetupArgs, execution_args: TxExecutionArgs, - connection_pool: ConnectionPool, - tx: Transaction, + connection: Connection<'static, Core>, block_args: BlockArgs, state_override: Option, - custom_tracers: Vec, + tracers: Vec, ) -> anyhow::Result { - if let Self::Mock(mock_executor) = self { - return mock_executor.execute_tx(&tx, &block_args); - } - - let total_factory_deps = tx.execute.factory_deps.len() as u16; - - let (published_bytecodes, execution_result) = tokio::task::spawn_blocking(move || { - let span = span!(Level::DEBUG, "execute_in_sandbox").entered(); - let result = apply::apply_vm_in_sandbox( - vm_permit, - shared_args, - adjust_pubdata_price, - &execution_args, - &connection_pool, - tx, - block_args, - state_override, - |vm, tx, _| { - let storage_invocation_tracer = - StorageInvocations::new(execution_args.missed_storage_invocation_limit); - let custom_tracers: Vec<_> = custom_tracers - .into_iter() - .map(|tracer| tracer.into_boxed()) - .chain(vec![storage_invocation_tracer.into_tracer_pointer()]) - .collect(); - vm.inspect_transaction_with_bytecode_compression( - custom_tracers.into(), - tx, - true, - ) - }, - ); - span.exit(); - result - }) - .await - .context("transaction execution panicked")??; + let total_factory_deps = execution_args.transaction.execute.factory_deps.len() as u16; + let (env, storage) = + apply::prepare_env_and_storage(connection, setup_args, &block_args).await?; + let state_override = state_override.unwrap_or_default(); + let storage = StorageWithOverrides::new(storage, &state_override); + + let (published_bytecodes, execution_result) = self + .inspect_transaction_with_bytecode_compression(storage, env, execution_args, tracers) + .await?; + drop(vm_permit); let metrics = vm_metrics::collect_tx_execution_metrics(total_factory_deps, &execution_result); @@ -163,42 +135,53 @@ impl TransactionExecutor { are_published_bytecodes_ok: published_bytecodes.is_ok(), }) } +} - #[allow(clippy::too_many_arguments)] - pub async fn execute_tx_eth_call( +#[async_trait] +impl OneshotExecutor for TransactionExecutor +where + S: ReadStorage + Send + 'static, +{ + type Tracers = Vec; + + async fn inspect_transaction( &self, - vm_permit: VmPermit, - shared_args: TxSharedArgs, - connection_pool: ConnectionPool, - call_overrides: CallOverrides, - mut tx: L2Tx, - block_args: BlockArgs, - vm_execution_cache_misses_limit: Option, - custom_tracers: Vec, - state_override: Option, + storage: S, + env: OneshotEnv, + args: TxExecutionArgs, + tracers: Self::Tracers, ) -> anyhow::Result { - let execution_args = TxExecutionArgs::for_eth_call( - call_overrides.enforced_base_fee, - vm_execution_cache_misses_limit, - ); - - if tx.common_data.signature.is_empty() { - tx.common_data.signature = PackedEthSignature::default().serialize_packed().into(); + match self { + Self::Real(executor) => { + executor + .inspect_transaction(storage, env, args, tracers) + .await + } + Self::Mock(executor) => executor.inspect_transaction(storage, env, args, ()).await, } + } - let output = self - .execute_tx_in_sandbox( - vm_permit, - shared_args, - false, - execution_args, - connection_pool, - tx.into(), - block_args, - state_override, - custom_tracers, - ) - .await?; - Ok(output.vm) + async fn inspect_transaction_with_bytecode_compression( + &self, + storage: S, + env: OneshotEnv, + args: TxExecutionArgs, + tracers: Self::Tracers, + ) -> anyhow::Result<( + Result<(), BytecodeCompressionError>, + VmExecutionResultAndLogs, + )> { + match self { + Self::Real(executor) => { + executor + .inspect_transaction_with_bytecode_compression(storage, env, args, tracers) + .await + } + Self::Mock(executor) => { + executor + .inspect_transaction_with_bytecode_compression(storage, env, args, ()) + .await + } + } } } diff --git a/core/node/api_server/src/execution_sandbox/mod.rs b/core/node/api_server/src/execution_sandbox/mod.rs index f7c876679cb..f2a3f0e5f8c 100644 --- a/core/node/api_server/src/execution_sandbox/mod.rs +++ b/core/node/api_server/src/execution_sandbox/mod.rs @@ -4,9 +4,13 @@ use std::{ }; use anyhow::Context as _; +use async_trait::async_trait; use rand::{thread_rng, Rng}; -use tokio::runtime::Handle; use zksync_dal::{pruning_dal::PruningInfo, Connection, Core, CoreDal, DalError}; +use zksync_multivm::interface::{ + storage::ReadStorage, BytecodeCompressionError, OneshotEnv, TxExecutionMode, + VmExecutionResultAndLogs, +}; use zksync_state::PostgresStorageCaches; use zksync_types::{ api, fee_model::BatchFeeInput, AccountTreeId, Address, L1BatchNumber, L2BlockNumber, L2ChainId, @@ -40,17 +44,9 @@ mod vm_metrics; /// as a proof that the caller obtained a token from `VmConcurrencyLimiter`, #[derive(Debug, Clone)] pub struct VmPermit { - /// A handle to the runtime that is used to query the VM storage. - rt_handle: Handle, _permit: Arc, } -impl VmPermit { - fn rt_handle(&self) -> &Handle { - &self.rt_handle - } -} - /// Barrier-like synchronization primitive allowing to close a [`VmConcurrencyLimiter`] it's attached to /// so that it doesn't issue new permits, and to wait for all permits to drop. #[derive(Debug, Clone)] @@ -103,7 +99,6 @@ impl VmConcurrencyBarrier { pub struct VmConcurrencyLimiter { /// Semaphore that limits the number of concurrent VM executions. limiter: Arc, - rt_handle: Handle, } impl VmConcurrencyLimiter { @@ -116,7 +111,6 @@ impl VmConcurrencyLimiter { let this = Self { limiter: Arc::clone(&limiter), - rt_handle: Handle::current(), }; let barrier = VmConcurrencyBarrier { limiter, @@ -144,7 +138,6 @@ impl VmConcurrencyLimiter { } Some(VmPermit { - rt_handle: self.rt_handle.clone(), _permit: Arc::new(permit), }) } @@ -163,9 +156,10 @@ async fn get_pending_state( Ok((block_id, resolved_block_number)) } -/// Arguments for VM execution not specific to a particular transaction. +/// Arguments for VM execution necessary to set up storage and environment. #[derive(Debug, Clone)] -pub(crate) struct TxSharedArgs { +pub(crate) struct TxSetupArgs { + pub execution_mode: TxExecutionMode, pub operator_account: AccountTreeId, pub fee_input: BatchFeeInput, pub base_system_contracts: MultiVMBaseSystemContracts, @@ -173,12 +167,17 @@ pub(crate) struct TxSharedArgs { pub validation_computational_gas_limit: u32, pub chain_id: L2ChainId, pub whitelisted_tokens_for_aa: Vec
, + pub enforced_base_fee: Option, } -impl TxSharedArgs { +impl TxSetupArgs { #[cfg(test)] - pub fn mock(base_system_contracts: MultiVMBaseSystemContracts) -> Self { + pub fn mock( + execution_mode: TxExecutionMode, + base_system_contracts: MultiVMBaseSystemContracts, + ) -> Self { Self { + execution_mode, operator_account: AccountTreeId::default(), fee_input: BatchFeeInput::l1_pegged(55, 555), base_system_contracts, @@ -186,6 +185,7 @@ impl TxSharedArgs { validation_computational_gas_limit: u32::MAX, chain_id: L2ChainId::default(), whitelisted_tokens_for_aa: Vec::new(), + enforced_base_fee: None, } } } @@ -417,3 +417,28 @@ impl BlockArgs { ) } } + +/// VM executor capable of executing isolated transactions / calls (as opposed to batch execution). +#[async_trait] +trait OneshotExecutor { + type Tracers: Default; + + async fn inspect_transaction( + &self, + storage: S, + env: OneshotEnv, + args: TxExecutionArgs, + tracers: Self::Tracers, + ) -> anyhow::Result; + + async fn inspect_transaction_with_bytecode_compression( + &self, + storage: S, + env: OneshotEnv, + args: TxExecutionArgs, + tracers: Self::Tracers, + ) -> anyhow::Result<( + Result<(), BytecodeCompressionError>, + VmExecutionResultAndLogs, + )>; +} diff --git a/core/node/api_server/src/execution_sandbox/testonly.rs b/core/node/api_server/src/execution_sandbox/testonly.rs index 59fa2e38db7..d9d60f52415 100644 --- a/core/node/api_server/src/execution_sandbox/testonly.rs +++ b/core/node/api_server/src/execution_sandbox/testonly.rs @@ -1,24 +1,24 @@ use std::fmt; +use async_trait::async_trait; +#[cfg(test)] +use zksync_multivm::interface::ExecutionResult; use zksync_multivm::interface::{ - ExecutionResult, TransactionExecutionMetrics, VmExecutionResultAndLogs, + storage::ReadStorage, BytecodeCompressionError, OneshotEnv, TxExecutionMode, + VmExecutionResultAndLogs, }; -use zksync_types::{l2::L2Tx, ExecuteTransactionCommon, Transaction}; +use zksync_types::Transaction; -use super::{ - execute::{TransactionExecutionOutput, TransactionExecutor}, - validate::ValidationError, - BlockArgs, -}; +use super::{execute::TransactionExecutor, OneshotExecutor, TxExecutionArgs}; -type TxResponseFn = dyn Fn(&Transaction, &BlockArgs) -> VmExecutionResultAndLogs + Send + Sync; +type TxResponseFn = dyn Fn(&Transaction, &OneshotEnv) -> VmExecutionResultAndLogs + Send + Sync; -pub struct MockTransactionExecutor { +pub struct MockOneshotExecutor { call_responses: Box, tx_responses: Box, } -impl fmt::Debug for MockTransactionExecutor { +impl fmt::Debug for MockOneshotExecutor { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { formatter .debug_struct("MockTransactionExecutor") @@ -26,7 +26,7 @@ impl fmt::Debug for MockTransactionExecutor { } } -impl Default for MockTransactionExecutor { +impl Default for MockOneshotExecutor { fn default() -> Self { Self { call_responses: Box::new(|tx, _| { @@ -42,11 +42,11 @@ impl Default for MockTransactionExecutor { } } -impl MockTransactionExecutor { +impl MockOneshotExecutor { #[cfg(test)] pub(crate) fn set_call_responses(&mut self, responses: F) where - F: Fn(&Transaction, &BlockArgs) -> ExecutionResult + 'static + Send + Sync, + F: Fn(&Transaction, &OneshotEnv) -> ExecutionResult + 'static + Send + Sync, { self.call_responses = self.wrap_responses(responses); } @@ -54,7 +54,7 @@ impl MockTransactionExecutor { #[cfg(test)] pub(crate) fn set_tx_responses(&mut self, responses: F) where - F: Fn(&Transaction, &BlockArgs) -> ExecutionResult + 'static + Send + Sync, + F: Fn(&Transaction, &OneshotEnv) -> ExecutionResult + 'static + Send + Sync, { self.tx_responses = self.wrap_responses(responses); } @@ -62,12 +62,12 @@ impl MockTransactionExecutor { #[cfg(test)] fn wrap_responses(&mut self, responses: F) -> Box where - F: Fn(&Transaction, &BlockArgs) -> ExecutionResult + 'static + Send + Sync, + F: Fn(&Transaction, &OneshotEnv) -> ExecutionResult + 'static + Send + Sync, { Box::new( - move |tx: &Transaction, ba: &BlockArgs| -> VmExecutionResultAndLogs { + move |tx: &Transaction, env: &OneshotEnv| -> VmExecutionResultAndLogs { VmExecutionResultAndLogs { - result: responses(tx, ba), + result: responses(tx, env), logs: Default::default(), statistics: Default::default(), refunds: Default::default(), @@ -79,56 +79,54 @@ impl MockTransactionExecutor { #[cfg(test)] pub(crate) fn set_tx_responses_with_logs(&mut self, responses: F) where - F: Fn(&Transaction, &BlockArgs) -> VmExecutionResultAndLogs + 'static + Send + Sync, + F: Fn(&Transaction, &OneshotEnv) -> VmExecutionResultAndLogs + 'static + Send + Sync, { self.tx_responses = Box::new(responses); } - pub(crate) fn validate_tx( - &self, - tx: L2Tx, - block_args: &BlockArgs, - ) -> Result<(), ValidationError> { - let result = (self.tx_responses)(&tx.into(), block_args); - match result.result { - ExecutionResult::Success { .. } => Ok(()), - other => Err(ValidationError::Internal(anyhow::anyhow!( - "transaction validation failed: {other:?}" - ))), + fn mock_inspect(&self, env: OneshotEnv, args: TxExecutionArgs) -> VmExecutionResultAndLogs { + match env.system.execution_mode { + TxExecutionMode::EthCall => (self.call_responses)(&args.transaction, &env), + TxExecutionMode::VerifyExecute | TxExecutionMode::EstimateFee => { + (self.tx_responses)(&args.transaction, &env) + } } } +} - pub(crate) fn execute_tx( - &self, - tx: &Transaction, - block_args: &BlockArgs, - ) -> anyhow::Result { - let result = self.get_execution_result(tx, block_args); - let output = TransactionExecutionOutput { - vm: result, - metrics: TransactionExecutionMetrics::default(), - are_published_bytecodes_ok: true, - }; +#[async_trait] +impl OneshotExecutor for MockOneshotExecutor +where + S: ReadStorage + Send + 'static, +{ + type Tracers = (); - Ok(output) + async fn inspect_transaction( + &self, + _storage: S, + env: OneshotEnv, + args: TxExecutionArgs, + (): Self::Tracers, + ) -> anyhow::Result { + Ok(self.mock_inspect(env, args)) } - fn get_execution_result( + async fn inspect_transaction_with_bytecode_compression( &self, - tx: &Transaction, - block_args: &BlockArgs, - ) -> VmExecutionResultAndLogs { - if let ExecuteTransactionCommon::L2(data) = &tx.common_data { - if data.input.is_none() { - return (self.call_responses)(tx, block_args); - } - } - (self.tx_responses)(tx, block_args) + _storage: S, + env: OneshotEnv, + args: TxExecutionArgs, + (): Self::Tracers, + ) -> anyhow::Result<( + Result<(), BytecodeCompressionError>, + VmExecutionResultAndLogs, + )> { + Ok((Ok(()), self.mock_inspect(env, args))) } } -impl From for TransactionExecutor { - fn from(executor: MockTransactionExecutor) -> Self { +impl From for TransactionExecutor { + fn from(executor: MockOneshotExecutor) -> Self { Self::Mock(executor) } } diff --git a/core/node/api_server/src/execution_sandbox/tests.rs b/core/node/api_server/src/execution_sandbox/tests.rs index 0a8af35597b..da593292e2e 100644 --- a/core/node/api_server/src/execution_sandbox/tests.rs +++ b/core/node/api_server/src/execution_sandbox/tests.rs @@ -4,9 +4,13 @@ use assert_matches::assert_matches; use zksync_dal::ConnectionPool; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; use zksync_node_test_utils::{create_l2_block, create_l2_transaction, prepare_recovery_snapshot}; +use zksync_types::{api::state_override::StateOverride, Transaction}; use super::*; -use crate::{execution_sandbox::apply::apply_vm_in_sandbox, tx_sender::ApiContracts}; +use crate::{ + execution_sandbox::{apply::VmSandbox, storage::StorageWithOverrides}, + tx_sender::ApiContracts, +}; #[tokio::test] async fn creating_block_args() { @@ -165,43 +169,43 @@ async fn creating_block_args_after_snapshot_recovery() { #[tokio::test] async fn instantiating_vm() { let pool = ConnectionPool::::test_pool().await; - let mut storage = pool.connection().await.unwrap(); - insert_genesis_batch(&mut storage, &GenesisParams::mock()) + let mut connection = pool.connection().await.unwrap(); + insert_genesis_batch(&mut connection, &GenesisParams::mock()) .await .unwrap(); - let block_args = BlockArgs::pending(&mut storage).await.unwrap(); - test_instantiating_vm(pool.clone(), block_args).await; - let start_info = BlockStartInfo::new(&mut storage, Duration::MAX) + let block_args = BlockArgs::pending(&mut connection).await.unwrap(); + test_instantiating_vm(connection, block_args).await; + + let mut connection = pool.connection().await.unwrap(); + let start_info = BlockStartInfo::new(&mut connection, Duration::MAX) .await .unwrap(); - let block_args = BlockArgs::new(&mut storage, api::BlockId::Number(0.into()), &start_info) + let block_args = BlockArgs::new(&mut connection, api::BlockId::Number(0.into()), &start_info) .await .unwrap(); - test_instantiating_vm(pool.clone(), block_args).await; + test_instantiating_vm(connection, block_args).await; } -async fn test_instantiating_vm(pool: ConnectionPool, block_args: BlockArgs) { - let (vm_concurrency_limiter, _) = VmConcurrencyLimiter::new(1); - let vm_permit = vm_concurrency_limiter.acquire().await.unwrap(); - let transaction = create_l2_transaction(10, 100).into(); +async fn test_instantiating_vm(connection: Connection<'static, Core>, block_args: BlockArgs) { + let transaction = Transaction::from(create_l2_transaction(10, 100)); let estimate_gas_contracts = ApiContracts::load_from_disk().await.unwrap().estimate_gas; + + let execution_args = TxExecutionArgs::for_gas_estimate(transaction.clone()); + let (env, storage) = apply::prepare_env_and_storage( + connection, + TxSetupArgs::mock(TxExecutionMode::EstimateFee, estimate_gas_contracts), + &block_args, + ) + .await + .unwrap(); + let storage = StorageWithOverrides::new(storage, &StateOverride::default()); + tokio::task::spawn_blocking(move || { - apply_vm_in_sandbox( - vm_permit, - TxSharedArgs::mock(estimate_gas_contracts), - true, - &TxExecutionArgs::for_gas_estimate(None, &transaction, 123), - &pool, - transaction.clone(), - block_args, - None, - |_, received_tx, _| { - assert_eq!(received_tx, transaction); - }, - ) + VmSandbox::new(storage, env, execution_args).apply(|_, received_tx| { + assert_eq!(received_tx, transaction); + }); }) .await - .expect("VM instantiation panicked") - .expect("VM instantiation errored"); + .expect("VM execution panicked") } diff --git a/core/node/api_server/src/execution_sandbox/tracers.rs b/core/node/api_server/src/execution_sandbox/tracers.rs index 8d61d896a36..31384b7a089 100644 --- a/core/node/api_server/src/execution_sandbox/tracers.rs +++ b/core/node/api_server/src/execution_sandbox/tracers.rs @@ -3,26 +3,49 @@ use std::sync::Arc; use once_cell::sync::OnceCell; use zksync_multivm::{ interface::{storage::WriteStorage, Call}, - tracers::CallTracer, - vm_latest::HistoryMode, + tracers::{CallTracer, ValidationTracer, ValidationTracerParams, ViolatedValidationRule}, + vm_latest::HistoryDisabled, MultiVMTracer, MultiVmTracerPointer, }; +use zksync_types::ProtocolVersionId; -/// Custom tracers supported by our API +/// Custom tracers supported by the API sandbox. #[derive(Debug)] pub(crate) enum ApiTracer { CallTracer(Arc>>), + Validation { + params: ValidationTracerParams, + result: Arc>, + }, } impl ApiTracer { - pub fn into_boxed< - S: WriteStorage, - H: HistoryMode + zksync_multivm::HistoryMode + 'static, - >( + pub fn validation( + params: ValidationTracerParams, + ) -> (Self, Arc>) { + let result = Arc::>::default(); + let this = Self::Validation { + params, + result: result.clone(), + }; + (this, result) + } + + pub(super) fn into_boxed( self, - ) -> MultiVmTracerPointer { + protocol_version: ProtocolVersionId, + ) -> MultiVmTracerPointer + where + S: WriteStorage, + { match self { - ApiTracer::CallTracer(tracer) => CallTracer::new(tracer.clone()).into_tracer_pointer(), + Self::CallTracer(traces) => CallTracer::new(traces).into_tracer_pointer(), + Self::Validation { params, result } => { + let (mut tracer, _) = + ValidationTracer::::new(params, protocol_version.into()); + tracer.result = result; + tracer.into_tracer_pointer() + } } } } diff --git a/core/node/api_server/src/execution_sandbox/validate.rs b/core/node/api_server/src/execution_sandbox/validate.rs index a856386b456..a95cf6c3a91 100644 --- a/core/node/api_server/src/execution_sandbox/validate.rs +++ b/core/node/api_server/src/execution_sandbox/validate.rs @@ -1,23 +1,23 @@ use std::collections::HashSet; use anyhow::Context as _; -use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use tracing::Instrument; +use zksync_dal::{Connection, Core, CoreDal}; use zksync_multivm::{ - interface::{ExecutionResult, VmExecutionMode, VmInterface}, - tracers::{ - StorageInvocations, ValidationError as RawValidationError, ValidationTracer, - ValidationTracerParams, - }, - vm_latest::HistoryDisabled, - MultiVMTracer, + interface::ExecutionResult, + tracers::{ValidationError as RawValidationError, ValidationTracerParams}, +}; +use zksync_types::{ + api::state_override::StateOverride, l2::L2Tx, Address, TRUSTED_ADDRESS_SLOTS, + TRUSTED_TOKEN_SLOTS, }; -use zksync_types::{l2::L2Tx, Address, Transaction, TRUSTED_ADDRESS_SLOTS, TRUSTED_TOKEN_SLOTS}; use super::{ apply, execute::TransactionExecutor, + storage::StorageWithOverrides, vm_metrics::{SandboxStage, EXECUTION_METRICS, SANDBOX_METRICS}, - BlockArgs, TxExecutionArgs, TxSharedArgs, VmPermit, + ApiTracer, BlockArgs, OneshotExecutor, TxExecutionArgs, TxSetupArgs, VmPermit, }; /// Validation error used by the sandbox. Besides validation errors returned by VM, it also includes an internal error @@ -31,88 +31,46 @@ pub(crate) enum ValidationError { } impl TransactionExecutor { + #[tracing::instrument(level = "debug", skip_all)] pub(crate) async fn validate_tx_in_sandbox( &self, - connection_pool: ConnectionPool, + mut connection: Connection<'static, Core>, vm_permit: VmPermit, tx: L2Tx, - shared_args: TxSharedArgs, + setup_args: TxSetupArgs, block_args: BlockArgs, computational_gas_limit: u32, ) -> Result<(), ValidationError> { - if let Self::Mock(mock) = self { - return mock.validate_tx(tx, &block_args); - } - - let stage_latency = SANDBOX_METRICS.sandbox[&SandboxStage::ValidateInSandbox].start(); - let mut connection = connection_pool - .connection_tagged("api") - .await - .context("failed acquiring DB connection")?; - let validation_params = get_validation_params( + let total_latency = SANDBOX_METRICS.sandbox[&SandboxStage::ValidateInSandbox].start(); + let params = get_validation_params( &mut connection, &tx, computational_gas_limit, - &shared_args.whitelisted_tokens_for_aa, + &setup_args.whitelisted_tokens_for_aa, ) .await .context("failed getting validation params")?; - drop(connection); - - let execution_args = TxExecutionArgs::for_validation(&tx); - let tx: Transaction = tx.into(); - - let validation_result = tokio::task::spawn_blocking(move || { - let span = tracing::debug_span!("validate_in_sandbox").entered(); - let result = apply::apply_vm_in_sandbox( - vm_permit, - shared_args, - true, - &execution_args, - &connection_pool, - tx, - block_args, - None, - |vm, tx, protocol_version| { - let stage_latency = SANDBOX_METRICS.sandbox[&SandboxStage::Validation].start(); - let span = tracing::debug_span!("validation").entered(); - vm.push_transaction(tx); - - let (tracer, validation_result) = ValidationTracer::::new( - validation_params, - protocol_version.into(), - ); - - let result = vm.inspect( - vec![ - tracer.into_tracer_pointer(), - StorageInvocations::new(execution_args.missed_storage_invocation_limit) - .into_tracer_pointer(), - ] - .into(), - VmExecutionMode::OneTx, - ); - - let result = match (result.result, validation_result.get()) { - (_, Some(err)) => Err(RawValidationError::ViolatedRule(err.clone())), - (ExecutionResult::Halt { reason }, _) => { - Err(RawValidationError::FailedTx(reason)) - } - (_, None) => Ok(()), - }; - - stage_latency.observe(); - span.exit(); - result - }, - ); - span.exit(); - result - }) - .await - .context("transaction validation panicked")??; + let (env, storage) = + apply::prepare_env_and_storage(connection, setup_args, &block_args).await?; + let storage = StorageWithOverrides::new(storage, &StateOverride::default()); + + let execution_args = TxExecutionArgs::for_validation(tx); + let (tracer, validation_result) = ApiTracer::validation(params); + let stage_latency = SANDBOX_METRICS.sandbox[&SandboxStage::Validation].start(); + let result = self + .inspect_transaction(storage, env, execution_args, vec![tracer]) + .instrument(tracing::debug_span!("validation")) + .await?; + drop(vm_permit); stage_latency.observe(); + + let validation_result = match (result.result, validation_result.get()) { + (_, Some(rule)) => Err(RawValidationError::ViolatedRule(rule.clone())), + (ExecutionResult::Halt { reason }, _) => Err(RawValidationError::FailedTx(reason)), + (_, None) => Ok(()), + }; + total_latency.observe(); validation_result.map_err(ValidationError::Vm) } } diff --git a/core/node/api_server/src/tx_sender/mod.rs b/core/node/api_server/src/tx_sender/mod.rs index cec2e14ddb2..c6f652da016 100644 --- a/core/node/api_server/src/tx_sender/mod.rs +++ b/core/node/api_server/src/tx_sender/mod.rs @@ -10,10 +10,10 @@ use zksync_dal::{ transactions_dal::L2TxSubmissionResult, Connection, ConnectionPool, Core, CoreDal, }; use zksync_multivm::{ - interface::{TransactionExecutionMetrics, VmExecutionResultAndLogs}, + interface::{TransactionExecutionMetrics, TxExecutionMode, VmExecutionResultAndLogs}, utils::{ adjust_pubdata_price_for_tx, derive_base_fee_and_gas_per_pubdata, derive_overhead, - get_eth_call_gas_limit, get_max_batch_gas_limit, + get_max_batch_gas_limit, }, vm_latest::constants::BATCH_COMPUTATIONAL_GAS_LIMIT, }; @@ -41,7 +41,7 @@ pub(super) use self::result::SubmitTxError; use self::{master_pool_sink::MasterPoolSink, tx_sink::TxSink}; use crate::{ execution_sandbox::{ - BlockArgs, SubmitTxStage, TransactionExecutor, TxExecutionArgs, TxSharedArgs, + BlockArgs, SubmitTxStage, TransactionExecutor, TxExecutionArgs, TxSetupArgs, VmConcurrencyBarrier, VmConcurrencyLimiter, VmPermit, SANDBOX_METRICS, }, tx_sender::result::ApiCallResult, @@ -252,6 +252,10 @@ impl TxSenderBuilder { self.whitelisted_tokens_for_aa_cache.unwrap_or_else(|| { Arc::new(RwLock::new(self.config.whitelisted_tokens_for_aa.clone())) }); + let missed_storage_invocation_limit = self + .config + .vm_execution_cache_misses_limit + .unwrap_or(usize::MAX); TxSender(Arc::new(TxSenderInner { sender_config: self.config, @@ -263,7 +267,7 @@ impl TxSenderBuilder { storage_caches, whitelisted_tokens_for_aa_cache, sealer, - executor: TransactionExecutor::Real, + executor: TransactionExecutor::real(missed_storage_invocation_limit), })) } } @@ -320,7 +324,7 @@ pub struct TxSenderInner { // Cache for white-listed tokens. pub(super) whitelisted_tokens_for_aa_cache: Arc>>, /// Batch sealer used to check whether transaction can be executed by the sequencer. - sealer: Arc, + pub(super) sealer: Arc, pub(super) executor: TransactionExecutor, } @@ -346,7 +350,7 @@ impl TxSender { self.0.whitelisted_tokens_for_aa_cache.read().await.clone() } - async fn acquire_replica_connection(&self) -> anyhow::Result> { + async fn acquire_replica_connection(&self) -> anyhow::Result> { self.0 .replica_connection_pool .connection_tagged("api") @@ -368,23 +372,20 @@ impl TxSender { stage_latency.observe(); let stage_latency = SANDBOX_METRICS.start_tx_submit_stage(tx_hash, SubmitTxStage::DryRun); - let shared_args = self.shared_args().await?; + let setup_args = self.call_args(&tx, None).await?; let vm_permit = self.0.vm_concurrency_limiter.acquire().await; let vm_permit = vm_permit.ok_or(SubmitTxError::ServerShuttingDown)?; let mut connection = self.acquire_replica_connection().await?; let block_args = BlockArgs::pending(&mut connection).await?; - drop(connection); let execution_output = self .0 .executor .execute_tx_in_sandbox( vm_permit.clone(), - shared_args.clone(), - true, - TxExecutionArgs::for_validation(&tx), - self.0.replica_connection_pool.clone(), - tx.clone().into(), + setup_args.clone(), + TxExecutionArgs::for_validation(tx.clone()), + connection, block_args, None, vec![], @@ -398,15 +399,16 @@ impl TxSender { let stage_latency = SANDBOX_METRICS.start_tx_submit_stage(tx_hash, SubmitTxStage::VerifyExecute); + let connection = self.acquire_replica_connection().await?; let computational_gas_limit = self.0.sender_config.validation_computational_gas_limit; let validation_result = self .0 .executor .validate_tx_in_sandbox( - self.0.replica_connection_pool.clone(), + connection, vm_permit, tx.clone(), - shared_args, + setup_args, block_args, computational_gas_limit, ) @@ -462,14 +464,23 @@ impl TxSender { /// **Important.** For the main node, this method acquires a DB connection inside `get_batch_fee_input()`. /// Thus, you shouldn't call it if you're holding a DB connection already. - async fn shared_args(&self) -> anyhow::Result { + async fn call_args( + &self, + tx: &L2Tx, + call_overrides: Option<&CallOverrides>, + ) -> anyhow::Result { let fee_input = self .0 .batch_fee_input_provider .get_batch_fee_input() .await .context("cannot get batch fee input")?; - Ok(TxSharedArgs { + Ok(TxSetupArgs { + execution_mode: if call_overrides.is_some() { + TxExecutionMode::EthCall + } else { + TxExecutionMode::VerifyExecute + }, operator_account: AccountTreeId::new(self.0.sender_config.fee_account_addr), fee_input, base_system_contracts: self.0.api_contracts.eth_call.clone(), @@ -480,6 +491,11 @@ impl TxSender { .validation_computational_gas_limit, chain_id: self.0.sender_config.chain_id, whitelisted_tokens_for_aa: self.read_whitelisted_tokens_for_aa_cache().await, + enforced_base_fee: if let Some(overrides) = call_overrides { + overrides.enforced_base_fee + } else { + Some(tx.common_data.fee.max_fee_per_gas.as_u64()) + }, }) } @@ -696,20 +712,17 @@ impl TxSender { } } - let shared_args = self.shared_args_for_gas_estimate(fee_model_params).await; - let vm_execution_cache_misses_limit = self.0.sender_config.vm_execution_cache_misses_limit; - let execution_args = - TxExecutionArgs::for_gas_estimate(vm_execution_cache_misses_limit, &tx, base_fee); + let setup_args = self.args_for_gas_estimate(fee_model_params, base_fee).await; + let execution_args = TxExecutionArgs::for_gas_estimate(tx); + let connection = self.acquire_replica_connection().await?; let execution_output = self .0 .executor .execute_tx_in_sandbox( vm_permit, - shared_args, - true, + setup_args, execution_args, - self.0.replica_connection_pool.clone(), - tx.clone(), + connection, block_args, state_override, vec![], @@ -718,10 +731,10 @@ impl TxSender { Ok((execution_output.vm, execution_output.metrics)) } - async fn shared_args_for_gas_estimate(&self, fee_input: BatchFeeInput) -> TxSharedArgs { + async fn args_for_gas_estimate(&self, fee_input: BatchFeeInput, base_fee: u64) -> TxSetupArgs { let config = &self.0.sender_config; - - TxSharedArgs { + TxSetupArgs { + execution_mode: TxExecutionMode::EstimateFee, operator_account: AccountTreeId::new(config.fee_account_addr), fee_input, // We want to bypass the computation gas limit check for gas estimation @@ -730,6 +743,7 @@ impl TxSender { caches: self.storage_caches(), chain_id: config.chain_id, whitelisted_tokens_for_aa: self.read_whitelisted_tokens_for_aa_cache().await, + enforced_base_fee: Some(base_fee), } } @@ -999,22 +1013,21 @@ impl TxSender { let vm_permit = self.0.vm_concurrency_limiter.acquire().await; let vm_permit = vm_permit.ok_or(SubmitTxError::ServerShuttingDown)?; - let vm_execution_cache_misses_limit = self.0.sender_config.vm_execution_cache_misses_limit; - self.0 + let connection = self.acquire_replica_connection().await?; + let result = self + .0 .executor - .execute_tx_eth_call( + .execute_tx_in_sandbox( vm_permit, - self.shared_args().await?, - self.0.replica_connection_pool.clone(), - call_overrides, - tx, + self.call_args(&tx, Some(&call_overrides)).await?, + TxExecutionArgs::for_eth_call(tx), + connection, block_args, - vm_execution_cache_misses_limit, - vec![], state_override, + vec![], ) - .await? - .into_api_call_result() + .await?; + result.vm.into_api_call_result() } pub async fn gas_price(&self) -> anyhow::Result { @@ -1067,19 +1080,4 @@ impl TxSender { } Ok(()) } - - pub(crate) async fn get_default_eth_call_gas( - &self, - block_args: BlockArgs, - ) -> anyhow::Result { - let mut connection = self.acquire_replica_connection().await?; - - let protocol_version = block_args - .resolve_block_info(&mut connection) - .await - .context("failed to resolve block info")? - .protocol_version; - - Ok(get_eth_call_gas_limit(protocol_version.into())) - } } diff --git a/core/node/api_server/src/tx_sender/tests.rs b/core/node/api_server/src/tx_sender/tests.rs index 06b6b7a1301..5f0f0dc925a 100644 --- a/core/node/api_server/src/tx_sender/tests.rs +++ b/core/node/api_server/src/tx_sender/tests.rs @@ -10,7 +10,7 @@ use zksync_utils::u256_to_h256; use super::*; use crate::{ - execution_sandbox::testonly::MockTransactionExecutor, web3::testonly::create_test_tx_sender, + execution_sandbox::testonly::MockOneshotExecutor, web3::testonly::create_test_tx_sender, }; #[tokio::test] @@ -31,7 +31,7 @@ async fn getting_nonce_for_account() { .await .unwrap(); - let tx_executor = MockTransactionExecutor::default().into(); + let tx_executor = MockOneshotExecutor::default().into(); let (tx_sender, _) = create_test_tx_sender(pool.clone(), l2_chain_id, tx_executor).await; let nonce = tx_sender.get_expected_nonce(test_address).await.unwrap(); @@ -81,7 +81,7 @@ async fn getting_nonce_for_account_after_snapshot_recovery() { .await; let l2_chain_id = L2ChainId::default(); - let tx_executor = MockTransactionExecutor::default().into(); + let tx_executor = MockOneshotExecutor::default().into(); let (tx_sender, _) = create_test_tx_sender(pool.clone(), l2_chain_id, tx_executor).await; storage @@ -136,7 +136,7 @@ async fn submitting_tx_requires_one_connection() { .unwrap(); drop(storage); - let mut tx_executor = MockTransactionExecutor::default(); + let mut tx_executor = MockOneshotExecutor::default(); tx_executor.set_tx_responses(move |received_tx, _| { assert_eq!(received_tx.hash(), tx_hash); ExecutionResult::Success { output: vec![] } diff --git a/core/node/api_server/src/web3/namespaces/debug.rs b/core/node/api_server/src/web3/namespaces/debug.rs index e71f4bd1e1e..473391476a3 100644 --- a/core/node/api_server/src/web3/namespaces/debug.rs +++ b/core/node/api_server/src/web3/namespaces/debug.rs @@ -4,7 +4,7 @@ use anyhow::Context as _; use once_cell::sync::OnceCell; use zksync_dal::{CoreDal, DalError}; use zksync_multivm::{ - interface::{Call, CallType, ExecutionResult}, + interface::{Call, CallType, ExecutionResult, TxExecutionMode}, vm_latest::constants::BATCH_COMPUTATIONAL_GAS_LIMIT, }; use zksync_system_constants::MAX_ENCODED_TX_SIZE; @@ -19,7 +19,7 @@ use zksync_types::{ use zksync_web3_decl::error::Web3Error; use crate::{ - execution_sandbox::{ApiTracer, TxSharedArgs}, + execution_sandbox::{ApiTracer, TxExecutionArgs, TxSetupArgs}, tx_sender::{ApiContracts, TxSenderConfig}, web3::{backend_jsonrpsee::MethodTracer, state::RpcState}, }; @@ -167,29 +167,20 @@ impl DebugNamespace { .state .resolve_block_args(&mut connection, block_id) .await?; - drop(connection); - self.current_method().set_block_diff( self.state .last_sealed_l2_block .diff_with_block_args(&block_args), ); - if request.gas.is_none() { - request.gas = Some( - self.state - .tx_sender - .get_default_eth_call_gas(block_args) - .await - .map_err(Web3Error::InternalError)? - .into(), - ) + request.gas = Some(block_args.default_eth_call_gas(&mut connection).await?); } + drop(connection); let call_overrides = request.get_call_overrides()?; let tx = L2Tx::from_request(request.into(), MAX_ENCODED_TX_SIZE)?; - let shared_args = self.shared_args().await; + let setup_args = self.call_args(call_overrides.enforced_base_fee).await; let vm_permit = self .state .tx_sender @@ -206,20 +197,20 @@ impl DebugNamespace { vec![ApiTracer::CallTracer(call_tracer_result.clone())] }; + let connection = self.state.acquire_connection().await?; let executor = &self.state.tx_sender.0.executor; let result = executor - .execute_tx_eth_call( + .execute_tx_in_sandbox( vm_permit, - shared_args, - self.state.connection_pool.clone(), - call_overrides, - tx.clone(), + setup_args, + TxExecutionArgs::for_eth_call(tx.clone()), + connection, block_args, - self.sender_config().vm_execution_cache_misses_limit, - custom_tracers, None, + custom_tracers, ) - .await?; + .await? + .vm; let (output, revert_reason) = match result.result { ExecutionResult::Success { output, .. } => (output, None), @@ -249,9 +240,10 @@ impl DebugNamespace { Ok(Self::map_call(call, false)) } - async fn shared_args(&self) -> TxSharedArgs { + async fn call_args(&self, enforced_base_fee: Option) -> TxSetupArgs { let sender_config = self.sender_config(); - TxSharedArgs { + TxSetupArgs { + execution_mode: TxExecutionMode::EthCall, operator_account: AccountTreeId::default(), fee_input: self.batch_fee_input, base_system_contracts: self.api_contracts.eth_call.clone(), @@ -263,6 +255,7 @@ impl DebugNamespace { .tx_sender .read_whitelisted_tokens_for_aa_cache() .await, + enforced_base_fee, } } } diff --git a/core/node/api_server/src/web3/namespaces/eth.rs b/core/node/api_server/src/web3/namespaces/eth.rs index c3bed64a146..fda5ff6f06b 100644 --- a/core/node/api_server/src/web3/namespaces/eth.rs +++ b/core/node/api_server/src/web3/namespaces/eth.rs @@ -70,18 +70,11 @@ impl EthNamespace { .last_sealed_l2_block .diff_with_block_args(&block_args), ); - drop(connection); - if request.gas.is_none() { - request.gas = Some( - self.state - .tx_sender - .get_default_eth_call_gas(block_args) - .await - .map_err(Web3Error::InternalError)? - .into(), - ) + request.gas = Some(block_args.default_eth_call_gas(&mut connection).await?); } + drop(connection); + let call_overrides = request.get_call_overrides()?; let tx = L2Tx::from_request(request.into(), self.state.api_config.max_tx_size)?; diff --git a/core/node/api_server/src/web3/testonly.rs b/core/node/api_server/src/web3/testonly.rs index 0f8c71aa628..d8e7d0b6539 100644 --- a/core/node/api_server/src/web3/testonly.rs +++ b/core/node/api_server/src/web3/testonly.rs @@ -8,11 +8,12 @@ use zksync_dal::ConnectionPool; use zksync_health_check::CheckHealth; use zksync_node_fee_model::MockBatchFeeParamsProvider; use zksync_state::PostgresStorageCaches; +use zksync_state_keeper::seal_criteria::NoopSealer; use zksync_types::L2ChainId; use super::{metrics::ApiTransportLabel, *}; use crate::{ - execution_sandbox::{testonly::MockTransactionExecutor, TransactionExecutor}, + execution_sandbox::{testonly::MockOneshotExecutor, TransactionExecutor}, tx_sender::TxSenderConfig, }; @@ -48,7 +49,9 @@ pub(crate) async fn create_test_tx_sender( .await .expect("failed building transaction sender"); - Arc::get_mut(&mut tx_sender.0).unwrap().executor = tx_executor; + let tx_sender_inner = Arc::get_mut(&mut tx_sender.0).unwrap(); + tx_sender_inner.executor = tx_executor; + tx_sender_inner.sealer = Arc::new(NoopSealer); // prevents "unexecutable transaction" errors (tx_sender, vm_barrier) } @@ -99,7 +102,7 @@ impl ApiServerHandles { pub async fn spawn_http_server( api_config: InternalApiConfig, pool: ConnectionPool, - tx_executor: MockTransactionExecutor, + tx_executor: MockOneshotExecutor, method_tracer: Arc, stop_receiver: watch::Receiver, ) -> ApiServerHandles { @@ -127,7 +130,7 @@ pub async fn spawn_ws_server( api_config, pool, websocket_requests_per_minute_limit, - MockTransactionExecutor::default(), + MockOneshotExecutor::default(), Arc::default(), stop_receiver, ) @@ -139,7 +142,7 @@ async fn spawn_server( api_config: InternalApiConfig, pool: ConnectionPool, websocket_requests_per_minute_limit: Option, - tx_executor: MockTransactionExecutor, + tx_executor: MockOneshotExecutor, method_tracer: Arc, stop_receiver: watch::Receiver, ) -> (ApiServerHandles, mpsc::UnboundedReceiver) { diff --git a/core/node/api_server/src/web3/tests/mod.rs b/core/node/api_server/src/web3/tests/mod.rs index 409eb2004d1..5617b097c0c 100644 --- a/core/node/api_server/src/web3/tests/mod.rs +++ b/core/node/api_server/src/web3/tests/mod.rs @@ -26,9 +26,12 @@ use zksync_node_test_utils::{ create_l1_batch, create_l1_batch_metadata, create_l2_block, create_l2_transaction, l1_batch_metadata_to_commitment_artifacts, prepare_recovery_snapshot, }; +use zksync_system_constants::{ + SYSTEM_CONTEXT_ADDRESS, SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, +}; use zksync_types::{ api, - block::L2BlockHeader, + block::{pack_block_info, L2BlockHeader}, get_nonce_key, l2::L2Tx, storage::get_code_key, @@ -55,7 +58,7 @@ use zksync_web3_decl::{ use super::*; use crate::{ - execution_sandbox::testonly::MockTransactionExecutor, + execution_sandbox::testonly::MockOneshotExecutor, web3::testonly::{spawn_http_server, spawn_ws_server}, }; @@ -135,8 +138,8 @@ trait HttpTest: Send + Sync { StorageInitialization::Genesis } - fn transaction_executor(&self) -> MockTransactionExecutor { - MockTransactionExecutor::default() + fn transaction_executor(&self) -> MockOneshotExecutor { + MockOneshotExecutor::default() } fn method_tracer(&self) -> Arc { @@ -174,7 +177,7 @@ impl StorageInitialization { } async fn prepare_storage( - &self, + self, network_config: &NetworkConfig, storage: &mut Connection<'_, Core>, ) -> anyhow::Result<()> { @@ -189,17 +192,33 @@ impl StorageInitialization { insert_genesis_batch(storage, ¶ms).await?; } } - Self::Recovery { logs, factory_deps } => { + Self::Recovery { + mut logs, + factory_deps, + } => { + let l2_block_info_key = StorageKey::new( + AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), + SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, + ); + let block_info = pack_block_info( + Self::SNAPSHOT_RECOVERY_BLOCK.0.into(), + Self::SNAPSHOT_RECOVERY_BLOCK.0.into(), + ); + logs.push(StorageLog::new_write_log( + l2_block_info_key, + u256_to_h256(block_info), + )); + prepare_recovery_snapshot( storage, Self::SNAPSHOT_RECOVERY_BATCH, Self::SNAPSHOT_RECOVERY_BLOCK, - logs, + &logs, ) .await; storage .factory_deps_dal() - .insert_factory_deps(Self::SNAPSHOT_RECOVERY_BLOCK, factory_deps) + .insert_factory_deps(Self::SNAPSHOT_RECOVERY_BLOCK, &factory_deps) .await?; // Insert the next L1 batch in the storage so that the API server doesn't hang up. @@ -282,7 +301,7 @@ fn execute_l2_transaction(transaction: L2Tx) -> TransactionExecutionResult { } } -/// Stores L2 block with a single transaction and returns the L2 block header + transaction hash. +/// Stores L2 block and returns the L2 block header. async fn store_l2_block( storage: &mut Connection<'_, Core>, number: L2BlockNumber, @@ -298,6 +317,18 @@ async fn store_l2_block( assert_matches!(tx_submission_result, L2TxSubmissionResult::Added); } + // Record L2 block info which is read by the VM sandbox logic + let l2_block_info_key = StorageKey::new( + AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), + SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, + ); + let block_info = pack_block_info(number.0.into(), number.0.into()); + let l2_block_log = StorageLog::new_write_log(l2_block_info_key, u256_to_h256(block_info)); + storage + .storage_logs_dal() + .append_storage_logs(number, &[l2_block_log]) + .await?; + let new_l2_block = create_l2_block(number.0); storage.blocks_dal().insert_l2_block(&new_l2_block).await?; storage diff --git a/core/node/api_server/src/web3/tests/vm.rs b/core/node/api_server/src/web3/tests/vm.rs index 90e1373a5cc..5b04250eebf 100644 --- a/core/node/api_server/src/web3/tests/vm.rs +++ b/core/node/api_server/src/web3/tests/vm.rs @@ -30,15 +30,15 @@ impl CallTest { } } - fn create_executor(only_block: L2BlockNumber) -> MockTransactionExecutor { - let mut tx_executor = MockTransactionExecutor::default(); - tx_executor.set_call_responses(move |tx, block_args| { + fn create_executor(latest_block: L2BlockNumber) -> MockOneshotExecutor { + let mut tx_executor = MockOneshotExecutor::default(); + tx_executor.set_call_responses(move |tx, env| { let expected_block_number = match tx.execute.calldata() { - b"pending" => only_block + 1, - b"first" => only_block, + b"pending" => latest_block + 1, + b"latest" => latest_block, data => panic!("Unexpected calldata: {data:?}"), }; - assert_eq!(block_args.resolved_block_number(), expected_block_number); + assert_eq!(env.l1_batch.first_l2_block.number, expected_block_number.0); ExecutionResult::Success { output: b"output".to_vec(), @@ -50,15 +50,20 @@ impl CallTest { #[async_trait] impl HttpTest for CallTest { - fn transaction_executor(&self) -> MockTransactionExecutor { - Self::create_executor(L2BlockNumber(0)) + fn transaction_executor(&self) -> MockOneshotExecutor { + Self::create_executor(L2BlockNumber(1)) } async fn test( &self, client: &DynClient, - _pool: &ConnectionPool, + pool: &ConnectionPool, ) -> anyhow::Result<()> { + // Store an additional L2 block because L2 block #0 has some special processing making it work incorrectly. + let mut connection = pool.connection().await?; + store_l2_block(&mut connection, L2BlockNumber(1), &[]).await?; + drop(connection); + let call_result = client .call(Self::call_request(b"pending"), None, None) .await?; @@ -66,8 +71,8 @@ impl HttpTest for CallTest { let valid_block_numbers_and_calldata = [ (api::BlockNumber::Pending, b"pending" as &[_]), - (api::BlockNumber::Latest, b"first"), - (0.into(), b"first"), + (api::BlockNumber::Latest, b"latest"), + (0.into(), b"latest"), ]; for (number, calldata) in valid_block_numbers_and_calldata { let number = api::BlockIdVariant::BlockNumber(number); @@ -107,7 +112,7 @@ impl HttpTest for CallTestAfterSnapshotRecovery { StorageInitialization::empty_recovery() } - fn transaction_executor(&self) -> MockTransactionExecutor { + fn transaction_executor(&self) -> MockOneshotExecutor { let first_local_l2_block = StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1; CallTest::create_executor(first_local_l2_block) } @@ -146,7 +151,7 @@ impl HttpTest for CallTestAfterSnapshotRecovery { for number in first_l2_block_numbers { let number = api::BlockIdVariant::BlockNumber(number); let call_result = client - .call(CallTest::call_request(b"first"), Some(number), None) + .call(CallTest::call_request(b"latest"), Some(number), None) .await?; assert_eq!(call_result.0, b"output"); } @@ -213,16 +218,16 @@ impl HttpTest for SendRawTransactionTest { } } - fn transaction_executor(&self) -> MockTransactionExecutor { - let mut tx_executor = MockTransactionExecutor::default(); + fn transaction_executor(&self) -> MockOneshotExecutor { + let mut tx_executor = MockOneshotExecutor::default(); let pending_block = if self.snapshot_recovery { StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 2 } else { L2BlockNumber(1) }; - tx_executor.set_tx_responses(move |tx, block_args| { + tx_executor.set_tx_responses(move |tx, env| { assert_eq!(tx.hash(), Self::transaction_bytes_and_hash().1); - assert_eq!(block_args.resolved_block_number(), pending_block); + assert_eq!(env.l1_batch.first_l2_block.number, pending_block.0); ExecutionResult::Success { output: vec![] } }); tx_executor @@ -311,8 +316,8 @@ impl SendTransactionWithDetailedOutputTest { } #[async_trait] impl HttpTest for SendTransactionWithDetailedOutputTest { - fn transaction_executor(&self) -> MockTransactionExecutor { - let mut tx_executor = MockTransactionExecutor::default(); + fn transaction_executor(&self) -> MockOneshotExecutor { + let mut tx_executor = MockOneshotExecutor::default(); let tx_bytes_and_hash = SendRawTransactionTest::transaction_bytes_and_hash(); let vm_execution_logs = VmExecutionLogs { storage_logs: self.storage_logs(), @@ -322,9 +327,9 @@ impl HttpTest for SendTransactionWithDetailedOutputTest { total_log_queries_count: 0, }; - tx_executor.set_tx_responses_with_logs(move |tx, block_args| { + tx_executor.set_tx_responses_with_logs(move |tx, env| { assert_eq!(tx.hash(), tx_bytes_and_hash.1); - assert_eq!(block_args.resolved_block_number(), L2BlockNumber(1)); + assert_eq!(env.l1_batch.first_l2_block.number, 1); VmExecutionResultAndLogs { result: ExecutionResult::Success { output: vec![] }, @@ -406,15 +411,20 @@ impl TraceCallTest { #[async_trait] impl HttpTest for TraceCallTest { - fn transaction_executor(&self) -> MockTransactionExecutor { - CallTest::create_executor(L2BlockNumber(0)) + fn transaction_executor(&self) -> MockOneshotExecutor { + CallTest::create_executor(L2BlockNumber(1)) } async fn test( &self, client: &DynClient, - _pool: &ConnectionPool, + pool: &ConnectionPool, ) -> anyhow::Result<()> { + // Store an additional L2 block because L2 block #0 has some special processing making it work incorrectly. + let mut connection = pool.connection().await?; + store_l2_block(&mut connection, L2BlockNumber(1), &[]).await?; + drop(connection); + let call_request = CallTest::call_request(b"pending"); let call_result = client.trace_call(call_request.clone(), None, None).await?; Self::assert_debug_call(&call_request, &call_result); @@ -424,13 +434,9 @@ impl HttpTest for TraceCallTest { .await?; Self::assert_debug_call(&call_request, &call_result); - let genesis_block_numbers = [ - api::BlockNumber::Earliest, - api::BlockNumber::Latest, - 0.into(), - ]; - let call_request = CallTest::call_request(b"first"); - for number in genesis_block_numbers { + let latest_block_numbers = [api::BlockNumber::Latest, 1.into()]; + let call_request = CallTest::call_request(b"latest"); + for number in latest_block_numbers { let call_result = client .trace_call( call_request.clone(), @@ -474,7 +480,7 @@ impl HttpTest for TraceCallTestAfterSnapshotRecovery { StorageInitialization::empty_recovery() } - fn transaction_executor(&self) -> MockTransactionExecutor { + fn transaction_executor(&self) -> MockOneshotExecutor { let number = StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 1; CallTest::create_executor(number) } @@ -504,7 +510,7 @@ impl HttpTest for TraceCallTestAfterSnapshotRecovery { assert_pruned_block_error(&error, first_local_l2_block); } - let call_request = CallTest::call_request(b"first"); + let call_request = CallTest::call_request(b"latest"); let first_l2_block_numbers = [api::BlockNumber::Latest, first_local_l2_block.0.into()]; for number in first_l2_block_numbers { let number = api::BlockId::Number(number); @@ -544,18 +550,18 @@ impl HttpTest for EstimateGasTest { SendRawTransactionTest { snapshot_recovery }.storage_initialization() } - fn transaction_executor(&self) -> MockTransactionExecutor { - let mut tx_executor = MockTransactionExecutor::default(); + fn transaction_executor(&self) -> MockOneshotExecutor { + let mut tx_executor = MockOneshotExecutor::default(); let pending_block_number = if self.snapshot_recovery { StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 2 } else { L2BlockNumber(1) }; let gas_limit_threshold = self.gas_limit_threshold.clone(); - tx_executor.set_call_responses(move |tx, block_args| { + tx_executor.set_tx_responses(move |tx, env| { assert_eq!(tx.execute.calldata(), [] as [u8; 0]); assert_eq!(tx.nonce(), Some(Nonce(0))); - assert_eq!(block_args.resolved_block_number(), pending_block_number); + assert_eq!(env.l1_batch.first_l2_block.number, pending_block_number.0); let gas_limit_threshold = gas_limit_threshold.load(Ordering::SeqCst); if tx.gas_limit() >= U256::from(gas_limit_threshold) { @@ -637,49 +643,17 @@ async fn estimate_gas_after_snapshot_recovery() { #[derive(Debug)] struct EstimateGasWithStateOverrideTest { - gas_limit_threshold: Arc, - snapshot_recovery: bool, -} - -impl EstimateGasWithStateOverrideTest { - fn new(snapshot_recovery: bool) -> Self { - Self { - gas_limit_threshold: Arc::default(), - snapshot_recovery, - } - } + inner: EstimateGasTest, } #[async_trait] impl HttpTest for EstimateGasWithStateOverrideTest { fn storage_initialization(&self) -> StorageInitialization { - let snapshot_recovery = self.snapshot_recovery; - SendRawTransactionTest { snapshot_recovery }.storage_initialization() + self.inner.storage_initialization() } - fn transaction_executor(&self) -> MockTransactionExecutor { - let mut tx_executor = MockTransactionExecutor::default(); - let pending_block_number = if self.snapshot_recovery { - StorageInitialization::SNAPSHOT_RECOVERY_BLOCK + 2 - } else { - L2BlockNumber(1) - }; - let gas_limit_threshold = self.gas_limit_threshold.clone(); - tx_executor.set_call_responses(move |tx, block_args| { - assert_eq!(tx.execute.calldata(), [] as [u8; 0]); - assert_eq!(tx.nonce(), Some(Nonce(0))); - assert_eq!(block_args.resolved_block_number(), pending_block_number); - - let gas_limit_threshold = gas_limit_threshold.load(Ordering::SeqCst); - if tx.gas_limit() >= U256::from(gas_limit_threshold) { - ExecutionResult::Success { output: vec![] } - } else { - ExecutionResult::Revert { - output: VmRevertReason::VmError, - } - } - }); - tx_executor + fn transaction_executor(&self) -> MockOneshotExecutor { + self.inner.transaction_executor() } async fn test( @@ -735,5 +709,6 @@ impl HttpTest for EstimateGasWithStateOverrideTest { #[tokio::test] async fn estimate_gas_with_state_override() { - test_http_server(EstimateGasWithStateOverrideTest::new(false)).await; + let inner = EstimateGasTest::new(false); + test_http_server(EstimateGasWithStateOverrideTest { inner }).await; }