Skip to content

Commit

Permalink
Fix batch queue handling in Optimism derivation (#73)
Browse files Browse the repository at this point in the history
* sort batches in a multimap

* update copyright
  • Loading branch information
Wollac authored Jan 22, 2024
1 parent 7280614 commit 3a99983
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 171 deletions.
173 changes: 97 additions & 76 deletions lib/src/optimism/batcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 RISC Zero, Inc.
// Copyright 2024 RISC Zero, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,14 +13,11 @@
// limitations under the License.

use core::cmp::Ordering;
use std::{
cmp::Reverse,
collections::{BinaryHeap, VecDeque},
};
use std::collections::{BTreeMap, VecDeque};

use anyhow::{ensure, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use zeth_primitives::{
batch::Batch,
batch::{Batch, BatchEssence},
transactions::{
ethereum::EthereumTxEssence,
optimism::{OptimismTxEssence, OPTIMISM_DEPOSITED_TX_TYPE},
Expand All @@ -34,9 +31,16 @@ use super::{
};

#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub struct BlockInfo {
pub struct BlockId {
pub hash: B256,
pub number: BlockNumber,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub struct L2BlockInfo {
pub hash: B256,
pub timestamp: u64,
pub l1_origin: BlockId,
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
Expand All @@ -52,7 +56,7 @@ pub struct Epoch {
pub struct State {
pub current_l1_block_number: BlockNumber,
pub current_l1_block_hash: BlockHash,
pub safe_head: BlockInfo,
pub safe_head: L2BlockInfo,
pub epoch: Epoch,
pub op_epoch_queue: VecDeque<Epoch>,
pub next_epoch: Option<Epoch>,
Expand All @@ -62,7 +66,7 @@ impl State {
pub fn new(
current_l1_block_number: BlockNumber,
current_l1_block_hash: BlockHash,
safe_head: BlockInfo,
safe_head: L2BlockInfo,
epoch: Epoch,
) -> Self {
State {
Expand All @@ -75,19 +79,19 @@ impl State {
}
}

pub fn do_next_epoch(&mut self) -> anyhow::Result<()> {
self.epoch = self.next_epoch.take().expect("No next epoch!");
pub fn do_next_epoch(&mut self) -> Result<()> {
self.epoch = self.next_epoch.take().context("no next epoch!")?;
self.deque_next_epoch_if_none()?;
Ok(())
}

pub fn push_epoch(&mut self, epoch: Epoch) -> anyhow::Result<()> {
pub fn push_epoch(&mut self, epoch: Epoch) -> Result<()> {
self.op_epoch_queue.push_back(epoch);
self.deque_next_epoch_if_none()?;
Ok(())
}

fn deque_next_epoch_if_none(&mut self) -> anyhow::Result<()> {
fn deque_next_epoch_if_none(&mut self) -> Result<()> {
if self.next_epoch.is_none() {
while let Some(next_epoch) = self.op_epoch_queue.pop_front() {
if next_epoch.number <= self.epoch.number {
Expand All @@ -96,7 +100,7 @@ impl State {
self.next_epoch = Some(next_epoch);
break;
} else {
anyhow::bail!("Epoch gap!");
bail!("Epoch gap!");
}
}
}
Expand All @@ -112,8 +116,15 @@ enum BatchStatus {
Future,
}

/// A [Batch] with inclusion information.
pub struct BatchWithInclusion {
pub essence: BatchEssence,
pub inclusion_block_number: BlockNumber,
}

pub struct Batcher {
batches: BinaryHeap<Reverse<Batch>>,
/// Multimap of batches, keyed by timestamp
batches: BTreeMap<u64, VecDeque<BatchWithInclusion>>,
batcher_channel: BatcherChannels,
pub state: State,
pub config: ChainConfig,
Expand All @@ -122,7 +133,7 @@ pub struct Batcher {
impl Batcher {
pub fn new(
config: ChainConfig,
op_head: BlockInfo,
op_head: L2BlockInfo,
eth_block: &BlockInput<EthereumTxEssence>,
) -> Result<Batcher> {
let eth_block_hash = eth_block.block_header.hash();
Expand All @@ -143,7 +154,7 @@ impl Batcher {
);

Ok(Batcher {
batches: BinaryHeap::new(),
batches: BTreeMap::new(),
batcher_channel,
state,
config,
Expand Down Expand Up @@ -198,7 +209,10 @@ impl Batcher {
batch.essence.parent_hash,
batch.essence.epoch_num
);
self.batches.push(Reverse(batch));
self.batches
.entry(batch.essence.timestamp)
.or_default()
.push_back(batch);
});
}

Expand All @@ -209,78 +223,85 @@ impl Batcher {
}

pub fn read_batch(&mut self) -> Result<Option<Batch>> {
let mut out = None;
let epoch = &self.state.epoch;
let safe_l2_head = self.state.safe_head;

ensure!(
safe_l2_head.l1_origin.hash == epoch.hash
|| safe_l2_head.l1_origin.number == epoch.number - 1,
"buffered L1 chain epoch does not match safe head origin"
);

let mut next_batch = None;

// Grab the first accepted batch. From the spec:
// "The batches are processed in order of the inclusion on L1: if multiple batches can be
// accept-ed the first is applied. An implementation can defer future batches a later
// derivation step to reduce validation work."
while let Some(Reverse(batch)) = self.batches.pop() {
match self.batch_status(&batch) {
BatchStatus::Accept => {
out = Some(batch);
break;
}
BatchStatus::Drop => {
#[cfg(not(target_os = "zkvm"))]
log::debug!("Dropping batch");
}
BatchStatus::Future => {
#[cfg(not(target_os = "zkvm"))]
log::debug!("Encountered future batch");

self.batches.push(Reverse(batch));
break;
}
BatchStatus::Undecided => {
#[cfg(not(target_os = "zkvm"))]
log::debug!("Encountered undecided batch");

self.batches.push(Reverse(batch));
break;
'outer: while let Some((ts, mut batches)) = self.batches.pop_first() {
// iterate over all batches, in order of inclusion and find the first accepted batch
// retain batches that may be processed in the future, or those we are undecided on
while let Some(batch) = batches.pop_front() {
match self.batch_status(&batch) {
BatchStatus::Accept => {
next_batch = Some(batch);
// if there are still batches left, insert them back into the map
if !batches.is_empty() {
self.batches.insert(ts, batches);
}
break 'outer;
}
BatchStatus::Drop => {}
BatchStatus::Future | BatchStatus::Undecided => {
batches.push_front(batch);
self.batches.insert(ts, batches);
break 'outer;
}
}
}
}

if let Some(batch) = next_batch {
return Ok(Some(Batch(batch.essence)));
}

// If there are no accepted batches, attempt to generate the default batch. From the spec:
// "If no batch can be accept-ed, and the stage has completed buffering of all batches
// that can fully be read from the L1 block at height epoch.number +
// sequence_window_size, and the next_epoch is available, then an empty batch can
// be derived."
if out.is_none() {
let current_l1_block = self.state.current_l1_block_number;
let safe_head = self.state.safe_head;
let current_epoch = &self.state.epoch;
let next_epoch = &self.state.next_epoch;
let seq_window_size = self.config.seq_window_size;

if let Some(next_epoch) = next_epoch {
if current_l1_block > current_epoch.number + seq_window_size {
let next_timestamp = safe_head.timestamp + self.config.blocktime;
let epoch = if next_timestamp < next_epoch.timestamp {
// From the spec:
// "If next_timestamp < next_epoch.time: the current L1 origin is repeated,
// to preserve the L2 time invariant."
current_epoch
} else {
next_epoch
};

out = Some(Batch::new(
current_l1_block,
safe_head.hash,
epoch.number,
epoch.hash,
next_timestamp,
))
}
// sequence_window_size, and the next_epoch is available, then an empty batch can be
// derived."
let current_l1_block = self.state.current_l1_block_number;
let sequence_window_size = self.config.seq_window_size;
let first_of_epoch = epoch.number == safe_l2_head.l1_origin.number + 1;

if current_l1_block > epoch.number + sequence_window_size {
if let Some(next_epoch) = &self.state.next_epoch {
let next_timestamp = safe_l2_head.timestamp + self.config.blocktime;
let batch_epoch = if next_timestamp < next_epoch.timestamp || first_of_epoch {
// From the spec:
// "If next_timestamp < next_epoch.time: the current L1 origin is repeated,
// to preserve the L2 time invariant."
// "If the batch is the first batch of the epoch, that epoch is used instead
// of advancing the epoch to ensure that there is at least one L2 block per
// epoch."
epoch
} else {
next_epoch
};

return Ok(Some(Batch::new(
safe_l2_head.hash,
batch_epoch.number,
batch_epoch.hash,
next_timestamp,
)));
}
}

Ok(out)
Ok(None)
}

fn batch_status(&self, batch: &Batch) -> BatchStatus {
fn batch_status(&self, batch: &BatchWithInclusion) -> BatchStatus {
// Apply the batch status rules. The spec describes a precise order for these checks.

let epoch = &self.state.epoch;
Expand All @@ -295,7 +316,7 @@ impl Batcher {
Ordering::Greater => {
#[cfg(not(target_os = "zkvm"))]
log::debug!(
"Future batch: {} = batch.essence.timestamp > next_timestamp = {}",
"Future batch: {} = batch.timestamp > next_timestamp = {}",
&batch.essence.timestamp,
&next_timestamp
);
Expand All @@ -304,7 +325,7 @@ impl Batcher {
Ordering::Less => {
#[cfg(not(target_os = "zkvm"))]
log::debug!(
"Batch too old: {} = batch.essence.timestamp < next_timestamp = {}",
"Batch too old: {} = batch.timestamp < next_timestamp = {}",
&batch.essence.timestamp,
&next_timestamp
);
Expand Down
24 changes: 15 additions & 9 deletions lib/src/optimism/batcher_channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 RISC Zero, Inc.
// Copyright 2024 RISC Zero, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,7 @@ use zeth_primitives::{
Address, BlockNumber,
};

use super::config::ChainConfig;
use super::{batcher::BatchWithInclusion, config::ChainConfig};
use crate::utils::MultiReader;

pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000;
Expand All @@ -37,7 +37,7 @@ pub struct BatcherChannels {
max_channel_bank_size: u64,
channel_timeout: u64,
channels: VecDeque<Channel>,
batches: VecDeque<Vec<Batch>>,
batches: VecDeque<Vec<BatchWithInclusion>>,
}

impl BatcherChannels {
Expand Down Expand Up @@ -126,7 +126,7 @@ impl BatcherChannels {
Ok(())
}

pub fn read_batches(&mut self) -> Option<Vec<Batch>> {
pub fn read_batches(&mut self) -> Option<Vec<BatchWithInclusion>> {
self.batches.pop_front()
}

Expand Down Expand Up @@ -282,7 +282,7 @@ impl Channel {

/// Reads all batches from an ready channel. If there is an invalid batch, the rest of
/// the channel is skipped, but previous batches are returned.
fn read_batches(&self, block_number: BlockNumber) -> Vec<Batch> {
fn read_batches(&self, block_number: BlockNumber) -> Vec<BatchWithInclusion> {
debug_assert!(self.is_ready());

let mut batches = Vec::new();
Expand All @@ -297,18 +297,24 @@ impl Channel {
batches
}

fn decode_batches(&self, block_number: BlockNumber, batches: &mut Vec<Batch>) -> Result<()> {
fn decode_batches(
&self,
block_number: BlockNumber,
batches: &mut Vec<BatchWithInclusion>,
) -> Result<()> {
let decompressed = self
.decompress()
.context("failed to decompress channel data")?;

let mut channel_data = decompressed.as_slice();
while !channel_data.is_empty() {
let mut batch = Batch::decode(&mut channel_data)
let batch = Batch::decode(&mut channel_data)
.with_context(|| format!("failed to decode batch {}", batches.len()))?;
batch.inclusion_block_number = block_number;

batches.push(batch);
batches.push(BatchWithInclusion {
essence: batch.0,
inclusion_block_number: block_number,
});
}

Ok(())
Expand Down
Loading

0 comments on commit 3a99983

Please sign in to comment.