Skip to content

Commit

Permalink
Fix channel bank handling in Optimism derivation (#71)
Browse files Browse the repository at this point in the history
* fix channel bank

* channel_index is not mut
  • Loading branch information
Wollac authored Dec 22, 2023
1 parent 0b2efed commit fbe4255
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 90 deletions.
2 changes: 2 additions & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub mod input;
pub mod mem_db;
pub mod optimism;

mod utils;

pub use zeth_primitives::transactions::{ethereum::EthereumTxEssence, optimism::OptimismTxEssence};

/// call forget only if running inside the guest
Expand Down
16 changes: 9 additions & 7 deletions lib/src/optimism/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,21 @@ impl Batcher {
deposits: deposits::extract_transactions(&self.config, eth_block)?,
})?;

// Read frames into channels
self.batcher_channel.process_l1_transactions(
self.config.system_config.batch_sender,
eth_block.block_header.number,
&eth_block.transactions,
)?;
// process all transactions of this block to generate batches
self.batcher_channel
.process_l1_transactions(
self.config.system_config.batch_sender,
eth_block.block_header.number,
&eth_block.transactions,
)
.context("failed to process transactions")?;

// Read batches
while let Some(batches) = self.batcher_channel.read_batches() {
batches.into_iter().for_each(|batch| {
#[cfg(not(target_os = "zkvm"))]
log::debug!(
"saw batch: t={}, ph={:?}, e={}",
"received batch: timestamp={}, parent_hash={}, epoch={}",
batch.essence.timestamp,
batch.essence.parent_hash,
batch.essence.epoch_num
Expand Down
213 changes: 130 additions & 83 deletions lib/src/optimism/batcher_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use zeth_primitives::{
};

use super::config::ChainConfig;
use crate::utils::MultiReader;

pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000;

Expand All @@ -50,12 +51,15 @@ impl BatcherChannels {
}
}

/// Processes all batcher transactions in the given block.
/// The given batch_sender must match the potentially updated batcher address loaded
/// from the system config.
pub fn process_l1_transactions(
&mut self,
batch_sender: Address,
block_number: BlockNumber,
transactions: &Vec<Transaction<EthereumTxEssence>>,
) -> anyhow::Result<()> {
) -> Result<()> {
for tx in transactions {
// From the spec:
// "The receiver must be the configured batcher inbox address."
Expand All @@ -65,76 +69,57 @@ impl BatcherChannels {
// From the spec:
// "The sender must match the batcher address loaded from the system config matching
// the L1 block of the data."
if tx.recover_from()? != batch_sender {
if tx.recover_from().context("invalid signature")? != batch_sender {
continue;
}

#[cfg(not(target_os = "zkvm"))]
log::debug!("received batcher tx: {}", tx.hash());

for frame in Frame::process_batcher_transaction(&tx.essence)? {
// From the spec:
// "If any one frame fails to parse, the all frames in the transaction are rejected."
let frames = match Frame::process_batcher_transaction(&tx.essence) {
Ok(frames) => frames,
Err(_err) => {
#[cfg(not(target_os = "zkvm"))]
log::warn!(
"failed to decode all frames; skip entire batcher tx: {:#}",
_err
);
continue;
}
};

// load received frames into the channel bank
for frame in frames {
#[cfg(not(target_os = "zkvm"))]
log::debug!(
"received frame: channel_id: {}, frame_number: {}, is_last: {}",
"received frame: channel_id={}, frame_number={}, is_last={}",
frame.channel_id,
frame.number,
frame.is_last
);

let frame_channel_id = frame.channel_id;

// Send the frame to its corresponding channel
{
if let Some(channel_index) = self.channel_index(frame_channel_id) {
let channel = &mut self.channels[channel_index];

// Enforce channel_timeout
if block_number > channel.open_l1_block + self.channel_timeout {
// Remove the channel. From the spec:
// "New frames for timed-out channels are dropped instead of buffered."
self.channels.remove(channel_index);
} else {
// Add frame to channel
channel.add_frame(frame).context("failed to add frame")?;
}
} else {
// Create new channel. From the spec:
// "When a channel ID referenced by a frame is not already present in the
// Channel Bank, a new channel is opened, tagged with the current L1
// block, and appended to the channel-queue"
self.channels.push_back(Channel::new(block_number, frame));
}
}

// Enforce max_channel_bank_size. From the spec:
// "After successfully inserting a new frame, the ChannelBank is pruned: channels
// are dropped in FIFO order, until total_size <= MAX_CHANNEL_BANK_SIZE."
{
let mut total_size = self.total_size();
while total_size as u64 > self.max_channel_bank_size {
let dropped_channel = self.channels.pop_front().unwrap();
total_size -= dropped_channel.size;

#[cfg(not(target_os = "zkvm"))]
log::debug!(
"pruned channel: {} (channel_size: {})",
dropped_channel.id,
dropped_channel.size
);
}
}
self.add_frame(block_number, frame);
}

// Decode batches from channel (if ready)
if let Some(channel_index) = self.channel_index(frame_channel_id) {
if self.channels[channel_index].is_ready() {
let channel = self.channels.remove(channel_index).unwrap();
// Remove all timed-out channels at the front of the queue. From the spec:
// "Upon reading, while the first opened channel is timed-out, remove it from the
// channel-bank."
while matches!(self.channels.front(), Some(channel) if block_number > channel.open_l1_block + self.channel_timeout)
{
let _channel = self.channels.pop_front().unwrap();
#[cfg(not(target_os = "zkvm"))]
log::debug!("timed-out channel: {}", _channel.id);
}

#[cfg(not(target_os = "zkvm"))]
log::debug!("received channel: {}", channel.id);
// read all ready channels from the front of the queue
while matches!(self.channels.front(), Some(channel) if channel.is_ready()) {
let channel = self.channels.pop_front().unwrap();
#[cfg(not(target_os = "zkvm"))]
log::debug!("received channel: {}", channel.id);

self.batches.push_back(channel.read_batches(block_number)?);
}
}
self.batches.push_back(channel.read_batches(block_number));
}
}

Expand All @@ -145,6 +130,57 @@ impl BatcherChannels {
self.batches.pop_front()
}

/// Adds a frame to the channel bank. Frames that cannot be added are ignored.
fn add_frame(&mut self, block_number: BlockNumber, frame: Frame) {
let channel = self
.channel_index(frame.channel_id)
.and_then(|idx| self.channels.get_mut(idx));

match channel {
Some(channel) => {
if block_number > channel.open_l1_block + self.channel_timeout {
// From the spec:
// "New frames for timed-out channels are dropped instead of buffered."
#[cfg(not(target_os = "zkvm"))]
log::warn!("frame's channel is timed out; ignored");
return;
} else if let Err(_err) = channel.add_frame(frame) {
#[cfg(not(target_os = "zkvm"))]
log::warn!("failed to add frame to channel; ignored: {:#}", _err);
return;
}
}
None => {
// Create new channel. From the spec:
// "When a channel ID referenced by a frame is not already present in the
// Channel Bank, a new channel is opened, tagged with the current L1
// block, and appended to the channel-queue"
self.channels.push_back(Channel::new(block_number, frame));
}
}

// From the spec:
// "After successfully inserting a new frame, the ChannelBank is pruned: channels
// are dropped in FIFO order, until total_size <= MAX_CHANNEL_BANK_SIZE."
self.prune();
}

/// Enforces max_channel_bank_size by dropping channels in FIFO order.
fn prune(&mut self) {
let mut total_size = self.total_size();
while total_size as u64 > self.max_channel_bank_size {
let dropped_channel = self.channels.pop_front().unwrap();
total_size -= dropped_channel.size;

#[cfg(not(target_os = "zkvm"))]
log::debug!(
"pruned channel: {} (channel_size: {})",
dropped_channel.id,
dropped_channel.size
);
}
}

fn total_size(&self) -> usize {
self.channels.iter().map(|c| c.size).sum()
}
Expand Down Expand Up @@ -244,45 +280,56 @@ impl Channel {
Ok(())
}

fn read_batches(&self, l1_block_number: BlockNumber) -> Result<Vec<Batch>> {
let decompressed = self.decompress()?;
let mut channel_data = decompressed.as_slice();
/// Reads all batches from an ready channel. If there is an invalid batch, the rest of
/// the channel is skipped, but previous batches are returned.
fn read_batches(&self, block_number: BlockNumber) -> Vec<Batch> {
debug_assert!(self.is_ready());

let mut batches = Vec::new();
if let Err(_err) = self.decode_batches(block_number, &mut batches) {
#[cfg(not(target_os = "zkvm"))]
log::warn!(
"failed to decode all batches; skipping rest of channel: {:#}",
_err
);
}

batches
}

fn decode_batches(&self, block_number: BlockNumber, batches: &mut Vec<Batch>) -> Result<()> {
let decompressed = self
.decompress()
.context("failed to decompress channel data")?;

let mut channel_data = decompressed.as_slice();
while !channel_data.is_empty() {
let mut batch =
Batch::decode(&mut channel_data).context("failed to decode batch data")?;
batch.inclusion_block_number = l1_block_number;
let mut batch = Batch::decode(&mut channel_data)
.with_context(|| format!("failed to decode batch {}", batches.len()))?;
batch.inclusion_block_number = block_number;

batches.push(batch);
}

Ok(batches)
Ok(())
}

fn decompress(&self) -> Result<Vec<u8>> {
let compressed = {
let mut buf = Vec::new();
for frame in self.frames.values() {
buf.extend(&frame.data)
}

buf
};
// chain all frames' data together
let data = MultiReader::new(self.frames.values().map(|frame| frame.data.as_slice()));

// From the spec:
// "When decompressing a channel, we limit the amount of decompressed data to
// MAX_RLP_BYTES_PER_CHANNEL (currently 10,000,000 bytes), in order to avoid "zip-bomb"
// types of attack (where a small compressed input decompresses to a humongous amount
// of data). If the decompressed data exceeds the limit, things proceeds as though the
// channel contained only the first MAX_RLP_BYTES_PER_CHANNEL decompressed bytes."
let mut decompressed = Vec::new();
Decoder::new(compressed.as_slice())?
let mut buf = Vec::new();
Decoder::new(data)?
.take(MAX_RLP_BYTES_PER_CHANNEL)
.read_to_end(&mut decompressed)
.context("failed to decompress")?;
.read_to_end(&mut buf)?;

Ok(decompressed)
Ok(buf)
}
}

Expand Down Expand Up @@ -311,13 +358,12 @@ impl Frame {
.data()
.split_first()
.context("empty transaction data")?;
ensure!(version == &0, "Invalid version: {}", version);
ensure!(version == &0, "invalid transaction version: {}", version);

let mut frames = Vec::new();
while !rollup_payload.is_empty() {
// From the spec:
// "If any one frame fails to parse, the all frames in the transaction are rejected."
let frame = Frame::decode(&mut rollup_payload).context("invalid frame")?;
let frame = Frame::decode(&mut rollup_payload)
.with_context(|| format!("failed to decode frame {}", frames.len()))?;
frames.push(frame);
}

Expand Down Expand Up @@ -508,23 +554,24 @@ mod tests {
let frame_a = Frame {
channel_id: CHANNEL_ID,
number: 1,
data: b"seven__".to_vec(),
data: vec![202, 73, 81, 4, 0, 28, 73, 4, 62],
is_last: true,
};
let frame_b = Frame {
channel_id: CHANNEL_ID,
number: 0,
data: b"four".to_vec(),
data: vec![120, 156, 243, 72, 205, 201, 201, 87, 8, 207, 47],
..Default::default()
};

let mut channel = new_channel();
channel.add_frame(frame_a).unwrap();
assert_eq!(channel.size, 207);
assert_eq!(channel.size, 209);
assert_eq!(channel.is_ready(), false);
channel.add_frame(frame_b).unwrap();
assert_eq!(channel.size, 411);
assert_eq!(channel.size, 420);
assert_eq!(channel.is_ready(), true);
assert_eq!(channel.decompress().unwrap(), b"Hello World!");
}
}
}
Expand Down
Loading

0 comments on commit fbe4255

Please sign in to comment.