Skip to content

Commit

Permalink
refactor(vm): Refactor L1 batch params provider (#2891)
Browse files Browse the repository at this point in the history
## What ❔

Refactors `L1BatchParamsProvider`, in particular its construction.

## Why ❔

To have more intuitive DevEx.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored and Deniallugo committed Sep 24, 2024
1 parent 44f743b commit 7ffca64
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 101 deletions.
40 changes: 39 additions & 1 deletion core/lib/vm_executor/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,15 @@ pub struct L1BatchParamsProvider {
}

impl L1BatchParamsProvider {
pub fn new() -> Self {
/// Creates a new provider.
pub async fn new(storage: &mut Connection<'_, Core>) -> anyhow::Result<Self> {
let mut this = Self::uninitialized();
this.initialize(storage).await?;
Ok(this)
}

/// Creates an uninitialized provider. Before use, it must be [`initialize`](Self::initialize())d.
pub fn uninitialized() -> Self {
Self { snapshot: None }
}

Expand Down Expand Up @@ -323,4 +331,34 @@ impl L1BatchParamsProvider {
chain_id,
))
}

/// Combines [`Self::load_first_l2_block_in_batch()`] and [Self::load_l1_batch_params()`]. Returns `Ok(None)`
/// iff the requested batch doesn't have any persisted blocks.
///
/// Prefer using this method unless you need to manipulate / inspect the first block in the batch.
pub async fn load_l1_batch_env(
&self,
storage: &mut Connection<'_, Core>,
number: L1BatchNumber,
validation_computational_gas_limit: u32,
chain_id: L2ChainId,
) -> anyhow::Result<Option<(SystemEnv, L1BatchEnv)>> {
let first_l2_block = self
.load_first_l2_block_in_batch(storage, number)
.await
.with_context(|| format!("failed loading first L2 block for L1 batch #{number}"))?;
let Some(first_l2_block) = first_l2_block else {
return Ok(None);
};

self.load_l1_batch_params(
storage,
&first_l2_block,
validation_computational_gas_limit,
chain_id,
)
.await
.with_context(|| format!("failed loading params for L1 batch #{number}"))
.map(Some)
}
}
2 changes: 1 addition & 1 deletion core/node/node_sync/src/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ExternalIO {
main_node_client: Box<dyn MainNodeClient>,
chain_id: L2ChainId,
) -> anyhow::Result<Self> {
let l1_batch_params_provider = L1BatchParamsProvider::new();
let l1_batch_params_provider = L1BatchParamsProvider::uninitialized();
Ok(Self {
pool,
l1_batch_params_provider,
Expand Down
58 changes: 19 additions & 39 deletions core/node/state_keeper/src/io/common/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ async fn waiting_for_l1_batch_params_with_genesis() {
.await
.unwrap();

let mut provider = L1BatchParamsProvider::new();
provider.initialize(&mut storage).await.unwrap();
let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap();
let (hash, timestamp) = provider
.wait_for_l1_batch_params(&mut storage, L1BatchNumber(0))
.await
Expand Down Expand Up @@ -143,8 +142,7 @@ async fn waiting_for_l1_batch_params_after_snapshot_recovery() {
let snapshot_recovery =
prepare_recovery_snapshot(&mut storage, L1BatchNumber(23), L2BlockNumber(42), &[]).await;

let mut provider = L1BatchParamsProvider::new();
provider.initialize(&mut storage).await.unwrap();
let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap();
let (hash, timestamp) = provider
.wait_for_l1_batch_params(&mut storage, snapshot_recovery.l1_batch_number)
.await
Expand Down Expand Up @@ -192,8 +190,7 @@ async fn getting_first_l2_block_in_batch_with_genesis() {
.await
.unwrap();

let mut provider = L1BatchParamsProvider::new();
provider.initialize(&mut storage).await.unwrap();
let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap();
let mut batches_and_l2_blocks = HashMap::from([
(L1BatchNumber(0), Ok(Some(L2BlockNumber(0)))),
(L1BatchNumber(1), Ok(Some(L2BlockNumber(1)))),
Expand Down Expand Up @@ -264,8 +261,7 @@ async fn getting_first_l2_block_in_batch_after_snapshot_recovery() {
let snapshot_recovery =
prepare_recovery_snapshot(&mut storage, L1BatchNumber(23), L2BlockNumber(42), &[]).await;

let mut provider = L1BatchParamsProvider::new();
provider.initialize(&mut storage).await.unwrap();
let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap();
let mut batches_and_l2_blocks = HashMap::from([
(L1BatchNumber(1), Err(())),
(snapshot_recovery.l1_batch_number, Err(())),
Expand Down Expand Up @@ -321,24 +317,20 @@ async fn loading_pending_batch_with_genesis() {
)
.await;

let mut provider = L1BatchParamsProvider::new();
provider.initialize(&mut storage).await.unwrap();
let first_l2_block_in_batch = provider
.load_first_l2_block_in_batch(&mut storage, L1BatchNumber(1))
.await
.unwrap()
.expect("no first L2 block");
assert_eq!(first_l2_block_in_batch.number(), L2BlockNumber(1));

let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap();
let (system_env, l1_batch_env) = provider
.load_l1_batch_params(
.load_l1_batch_env(
&mut storage,
&first_l2_block_in_batch,
L1BatchNumber(1),
u32::MAX,
L2ChainId::default(),
)
.await
.unwrap();
.unwrap()
.expect("no L1 batch");

assert_eq!(l1_batch_env.first_l2_block.number, 1);

let pending_batch = load_pending_batch(&mut storage, system_env, l1_batch_env)
.await
.unwrap();
Expand Down Expand Up @@ -403,27 +395,17 @@ async fn loading_pending_batch_after_snapshot_recovery() {
)
.await;

let mut provider = L1BatchParamsProvider::new();
provider.initialize(&mut storage).await.unwrap();
let first_l2_block_in_batch = provider
.load_first_l2_block_in_batch(&mut storage, snapshot_recovery.l1_batch_number + 1)
.await
.unwrap()
.expect("no first L2 block");
assert_eq!(
first_l2_block_in_batch.number(),
snapshot_recovery.l2_block_number + 1
);

let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap();
let (system_env, l1_batch_env) = provider
.load_l1_batch_params(
.load_l1_batch_env(
&mut storage,
&first_l2_block_in_batch,
snapshot_recovery.l1_batch_number + 1,
u32::MAX,
L2ChainId::default(),
)
.await
.unwrap();
.unwrap()
.expect("no L1 batch");
let pending_batch = load_pending_batch(&mut storage, system_env, l1_batch_env)
.await
.unwrap();
Expand Down Expand Up @@ -466,8 +448,7 @@ async fn getting_batch_version_with_genesis() {
.await
.unwrap();

let mut provider = L1BatchParamsProvider::new();
provider.initialize(&mut storage).await.unwrap();
let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap();
let version = provider
.load_l1_batch_protocol_version(&mut storage, L1BatchNumber(0))
.await
Expand Down Expand Up @@ -506,8 +487,7 @@ async fn getting_batch_version_after_snapshot_recovery() {
let snapshot_recovery =
prepare_recovery_snapshot(&mut storage, L1BatchNumber(23), L2BlockNumber(42), &[]).await;

let mut provider = L1BatchParamsProvider::new();
provider.initialize(&mut storage).await.unwrap();
let provider = L1BatchParamsProvider::new(&mut storage).await.unwrap();
let version = provider
.load_l1_batch_protocol_version(&mut storage, snapshot_recovery.l1_batch_number)
.await
Expand Down
28 changes: 8 additions & 20 deletions core/node/state_keeper/src/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,18 @@ impl StateKeeperIO for MempoolIO {

L2BlockSealProcess::clear_pending_l2_block(&mut storage, cursor.next_l2_block - 1).await?;

let pending_l2_block_header = self
let Some((system_env, l1_batch_env)) = self
.l1_batch_params_provider
.load_first_l2_block_in_batch(&mut storage, cursor.l1_batch)
.await
.with_context(|| {
format!(
"failed loading first L2 block for L1 batch #{}",
cursor.l1_batch
)
})?;
let Some(pending_l2_block_header) = pending_l2_block_header else {
return Ok((cursor, None));
};

let (system_env, l1_batch_env) = self
.l1_batch_params_provider
.load_l1_batch_params(
.load_l1_batch_env(
&mut storage,
&pending_l2_block_header,
cursor.l1_batch,
self.validation_computational_gas_limit,
self.chain_id,
)
.await
.with_context(|| format!("failed loading params for L1 batch #{}", cursor.l1_batch))?;
.await?
else {
return Ok((cursor, None));
};
let pending_batch_data = load_pending_batch(&mut storage, system_env, l1_batch_env)
.await
.with_context(|| {
Expand Down Expand Up @@ -436,7 +424,7 @@ impl MempoolIO {
l2_block_max_payload_size_sealer: L2BlockMaxPayloadSizeSealer::new(config),
filter: L2TxFilter::default(),
// ^ Will be initialized properly on the first newly opened batch
l1_batch_params_provider: L1BatchParamsProvider::new(),
l1_batch_params_provider: L1BatchParamsProvider::uninitialized(),
fee_account,
validation_computational_gas_limit: config.validation_computational_gas_limit,
max_allowed_tx_gas_limit: config.max_allowed_l2_tx_gas_limit.into(),
Expand Down
20 changes: 5 additions & 15 deletions core/node/tee_verifier_input_producer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,34 +77,24 @@ impl TeeVerifierInputProducer {
.with_context(|| format!("header is missing for L1 batch #{l1_batch_number}"))?
.unwrap();

let mut l1_batch_params_provider = L1BatchParamsProvider::new();
l1_batch_params_provider
.initialize(&mut connection)
let l1_batch_params_provider = L1BatchParamsProvider::new(&mut connection)
.await
.context("failed initializing L1 batch params provider")?;

let first_miniblock_in_batch = l1_batch_params_provider
.load_first_l2_block_in_batch(&mut connection, l1_batch_number)
.await
.with_context(|| {
format!("failed loading first miniblock in L1 batch #{l1_batch_number}")
})?
.with_context(|| format!("no miniblocks persisted for L1 batch #{l1_batch_number}"))?;

// In the state keeper, this value is used to reject execution.
// All batches have already been executed by State Keeper.
// This means we don't want to reject any execution, therefore we're using MAX as an allow all.
let validation_computational_gas_limit = u32::MAX;

let (system_env, l1_batch_env) = l1_batch_params_provider
.load_l1_batch_params(
.load_l1_batch_env(
&mut connection,
&first_miniblock_in_batch,
l1_batch_number,
validation_computational_gas_limit,
l2_chain_id,
)
.await
.context("expected miniblock to be executed and sealed")?;
.await?
.with_context(|| format!("expected L1 batch #{l1_batch_number} to be sealed"))?;

let used_contract_hashes = l1_batch_header
.used_contract_hashes
Expand Down
37 changes: 12 additions & 25 deletions core/node/vm_runner/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ pub(crate) struct PostgresLoader {

impl PostgresLoader {
pub async fn new(pool: ConnectionPool<Core>, chain_id: L2ChainId) -> anyhow::Result<Self> {
let mut l1_batch_params_provider = L1BatchParamsProvider::new();
let mut conn = pool.connection_tagged("vm_runner").await?;
l1_batch_params_provider.initialize(&mut conn).await?;
let l1_batch_params_provider = L1BatchParamsProvider::new(&mut conn).await?;
Ok(Self {
pool,
l1_batch_params_provider,
Expand Down Expand Up @@ -151,12 +150,11 @@ impl<Io: VmRunnerIo + Clone> VmRunnerStorage<Io> {
chain_id: L2ChainId,
) -> anyhow::Result<(Self, StorageSyncTask<Io>)> {
let mut conn = pool.connection_tagged(io.name()).await?;
let mut l1_batch_params_provider = L1BatchParamsProvider::new();
l1_batch_params_provider
.initialize(&mut conn)
let l1_batch_params_provider = L1BatchParamsProvider::new(&mut conn)
.await
.context("Failed initializing L1 batch params provider")?;
drop(conn);

let state = Arc::new(RwLock::new(State {
rocksdb: None,
l1_batch_number: L1BatchNumber(0),
Expand Down Expand Up @@ -263,9 +261,7 @@ impl<Io: VmRunnerIo> StorageSyncTask<Io> {
state: Arc<RwLock<State>>,
) -> anyhow::Result<Self> {
let mut conn = pool.connection_tagged(io.name()).await?;
let mut l1_batch_params_provider = L1BatchParamsProvider::new();
l1_batch_params_provider
.initialize(&mut conn)
let l1_batch_params_provider = L1BatchParamsProvider::new(&mut conn)
.await
.context("Failed initializing L1 batch params provider")?;
let target_l1_batch_number = io.latest_processed_batch(&mut conn).await?;
Expand Down Expand Up @@ -398,29 +394,20 @@ pub(crate) async fn load_batch_execute_data(
l1_batch_params_provider: &L1BatchParamsProvider,
chain_id: L2ChainId,
) -> anyhow::Result<Option<BatchExecuteData>> {
let first_l2_block_in_batch = l1_batch_params_provider
.load_first_l2_block_in_batch(conn, l1_batch_number)
.await
.with_context(|| {
format!(
"Failed loading first L2 block for L1 batch #{}",
l1_batch_number
)
})?;
let Some(first_l2_block_in_batch) = first_l2_block_in_batch else {
return Ok(None);
};
let (system_env, l1_batch_env) = l1_batch_params_provider
.load_l1_batch_params(
let Some((system_env, l1_batch_env)) = l1_batch_params_provider
.load_l1_batch_env(
conn,
&first_l2_block_in_batch,
l1_batch_number,
// `validation_computational_gas_limit` is only relevant when rejecting txs, but we
// are re-executing so none of them should be rejected
u32::MAX,
chain_id,
)
.await
.with_context(|| format!("Failed loading params for L1 batch #{}", l1_batch_number))?;
.await?
else {
return Ok(None);
};

let l2_blocks = conn
.transactions_dal()
.get_l2_blocks_to_execute_for_l1_batch(l1_batch_number)
Expand Down

0 comments on commit 7ffca64

Please sign in to comment.