Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix channel bank handling in Optimism derivation #71

Merged
merged 2 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub mod input;
pub mod mem_db;
pub mod optimism;

mod utils;

pub use zeth_primitives::transactions::{ethereum::EthereumTxEssence, optimism::OptimismTxEssence};

/// call forget only if running inside the guest
Expand Down
16 changes: 9 additions & 7 deletions lib/src/optimism/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,21 @@ impl Batcher {
deposits: deposits::extract_transactions(&self.config, eth_block)?,
})?;

// Read frames into channels
self.batcher_channel.process_l1_transactions(
self.config.system_config.batch_sender,
eth_block.block_header.number,
&eth_block.transactions,
)?;
// process all transactions of this block to generate batches
self.batcher_channel
.process_l1_transactions(
self.config.system_config.batch_sender,
eth_block.block_header.number,
&eth_block.transactions,
)
.context("failed to process transactions")?;

// Read batches
while let Some(batches) = self.batcher_channel.read_batches() {
batches.into_iter().for_each(|batch| {
#[cfg(not(target_os = "zkvm"))]
log::debug!(
"saw batch: t={}, ph={:?}, e={}",
"received batch: timestamp={}, parent_hash={}, epoch={}",
batch.essence.timestamp,
batch.essence.parent_hash,
batch.essence.epoch_num
Expand Down
213 changes: 130 additions & 83 deletions lib/src/optimism/batcher_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use zeth_primitives::{
};

use super::config::ChainConfig;
use crate::utils::MultiReader;

pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000;

Expand All @@ -50,12 +51,15 @@ impl BatcherChannels {
}
}

/// Processes all batcher transactions in the given block.
/// The given batch_sender must match the potentially updated batcher address loaded
/// from the system config.
pub fn process_l1_transactions(
&mut self,
batch_sender: Address,
block_number: BlockNumber,
transactions: &Vec<Transaction<EthereumTxEssence>>,
) -> anyhow::Result<()> {
) -> Result<()> {
for tx in transactions {
// From the spec:
// "The receiver must be the configured batcher inbox address."
Expand All @@ -65,76 +69,57 @@ impl BatcherChannels {
// From the spec:
// "The sender must match the batcher address loaded from the system config matching
// the L1 block of the data."
if tx.recover_from()? != batch_sender {
if tx.recover_from().context("invalid signature")? != batch_sender {
continue;
}

#[cfg(not(target_os = "zkvm"))]
log::debug!("received batcher tx: {}", tx.hash());

for frame in Frame::process_batcher_transaction(&tx.essence)? {
// From the spec:
// "If any one frame fails to parse, the all frames in the transaction are rejected."
let frames = match Frame::process_batcher_transaction(&tx.essence) {
Ok(frames) => frames,
Err(_err) => {
#[cfg(not(target_os = "zkvm"))]
log::warn!(
"failed to decode all frames; skip entire batcher tx: {:#}",
_err
);
continue;
}
};

// load received frames into the channel bank
for frame in frames {
#[cfg(not(target_os = "zkvm"))]
log::debug!(
"received frame: channel_id: {}, frame_number: {}, is_last: {}",
"received frame: channel_id={}, frame_number={}, is_last={}",
frame.channel_id,
frame.number,
frame.is_last
);

let frame_channel_id = frame.channel_id;

// Send the frame to its corresponding channel
{
if let Some(channel_index) = self.channel_index(frame_channel_id) {
let channel = &mut self.channels[channel_index];

// Enforce channel_timeout
if block_number > channel.open_l1_block + self.channel_timeout {
// Remove the channel. From the spec:
// "New frames for timed-out channels are dropped instead of buffered."
self.channels.remove(channel_index);
} else {
// Add frame to channel
channel.add_frame(frame).context("failed to add frame")?;
}
} else {
// Create new channel. From the spec:
// "When a channel ID referenced by a frame is not already present in the
// Channel Bank, a new channel is opened, tagged with the current L1
// block, and appended to the channel-queue"
self.channels.push_back(Channel::new(block_number, frame));
}
}

// Enforce max_channel_bank_size. From the spec:
// "After successfully inserting a new frame, the ChannelBank is pruned: channels
// are dropped in FIFO order, until total_size <= MAX_CHANNEL_BANK_SIZE."
{
let mut total_size = self.total_size();
while total_size as u64 > self.max_channel_bank_size {
let dropped_channel = self.channels.pop_front().unwrap();
total_size -= dropped_channel.size;

#[cfg(not(target_os = "zkvm"))]
log::debug!(
"pruned channel: {} (channel_size: {})",
dropped_channel.id,
dropped_channel.size
);
}
}
self.add_frame(block_number, frame);
}

// Decode batches from channel (if ready)
if let Some(channel_index) = self.channel_index(frame_channel_id) {
if self.channels[channel_index].is_ready() {
let channel = self.channels.remove(channel_index).unwrap();
// Remove all timed-out channels at the front of the queue. From the spec:
// "Upon reading, while the first opened channel is timed-out, remove it from the
// channel-bank."
while matches!(self.channels.front(), Some(channel) if block_number > channel.open_l1_block + self.channel_timeout)
{
let _channel = self.channels.pop_front().unwrap();
#[cfg(not(target_os = "zkvm"))]
log::debug!("timed-out channel: {}", _channel.id);
}

#[cfg(not(target_os = "zkvm"))]
log::debug!("received channel: {}", channel.id);
// read all ready channels from the front of the queue
while matches!(self.channels.front(), Some(channel) if channel.is_ready()) {
let channel = self.channels.pop_front().unwrap();
#[cfg(not(target_os = "zkvm"))]
log::debug!("received channel: {}", channel.id);

self.batches.push_back(channel.read_batches(block_number)?);
}
}
self.batches.push_back(channel.read_batches(block_number));
}
}

Expand All @@ -145,6 +130,57 @@ impl BatcherChannels {
self.batches.pop_front()
}

/// Adds a frame to the channel bank. Frames that cannot be added are ignored.
fn add_frame(&mut self, block_number: BlockNumber, frame: Frame) {
let channel = self
.channel_index(frame.channel_id)
.and_then(|idx| self.channels.get_mut(idx));

match channel {
Some(channel) => {
if block_number > channel.open_l1_block + self.channel_timeout {
// From the spec:
// "New frames for timed-out channels are dropped instead of buffered."
#[cfg(not(target_os = "zkvm"))]
log::warn!("frame's channel is timed out; ignored");
return;
} else if let Err(_err) = channel.add_frame(frame) {
#[cfg(not(target_os = "zkvm"))]
log::warn!("failed to add frame to channel; ignored: {:#}", _err);
return;
}
}
None => {
// Create new channel. From the spec:
// "When a channel ID referenced by a frame is not already present in the
// Channel Bank, a new channel is opened, tagged with the current L1
// block, and appended to the channel-queue"
self.channels.push_back(Channel::new(block_number, frame));
}
}

// From the spec:
// "After successfully inserting a new frame, the ChannelBank is pruned: channels
// are dropped in FIFO order, until total_size <= MAX_CHANNEL_BANK_SIZE."
self.prune();
}

/// Enforces max_channel_bank_size by dropping channels in FIFO order.
fn prune(&mut self) {
let mut total_size = self.total_size();
while total_size as u64 > self.max_channel_bank_size {
let dropped_channel = self.channels.pop_front().unwrap();
total_size -= dropped_channel.size;

#[cfg(not(target_os = "zkvm"))]
log::debug!(
"pruned channel: {} (channel_size: {})",
dropped_channel.id,
dropped_channel.size
);
}
}

fn total_size(&self) -> usize {
self.channels.iter().map(|c| c.size).sum()
}
Expand Down Expand Up @@ -244,45 +280,56 @@ impl Channel {
Ok(())
}

fn read_batches(&self, l1_block_number: BlockNumber) -> Result<Vec<Batch>> {
let decompressed = self.decompress()?;
let mut channel_data = decompressed.as_slice();
/// Reads all batches from an ready channel. If there is an invalid batch, the rest of
/// the channel is skipped, but previous batches are returned.
fn read_batches(&self, block_number: BlockNumber) -> Vec<Batch> {
debug_assert!(self.is_ready());

let mut batches = Vec::new();
if let Err(_err) = self.decode_batches(block_number, &mut batches) {
#[cfg(not(target_os = "zkvm"))]
log::warn!(
"failed to decode all batches; skipping rest of channel: {:#}",
_err
);
}

batches
}

fn decode_batches(&self, block_number: BlockNumber, batches: &mut Vec<Batch>) -> Result<()> {
let decompressed = self
.decompress()
.context("failed to decompress channel data")?;

let mut channel_data = decompressed.as_slice();
while !channel_data.is_empty() {
let mut batch =
Batch::decode(&mut channel_data).context("failed to decode batch data")?;
batch.inclusion_block_number = l1_block_number;
let mut batch = Batch::decode(&mut channel_data)
.with_context(|| format!("failed to decode batch {}", batches.len()))?;
batch.inclusion_block_number = block_number;

batches.push(batch);
}

Ok(batches)
Ok(())
}

fn decompress(&self) -> Result<Vec<u8>> {
let compressed = {
let mut buf = Vec::new();
for frame in self.frames.values() {
buf.extend(&frame.data)
}

buf
};
// chain all frames' data together
let data = MultiReader::new(self.frames.values().map(|frame| frame.data.as_slice()));

// From the spec:
// "When decompressing a channel, we limit the amount of decompressed data to
// MAX_RLP_BYTES_PER_CHANNEL (currently 10,000,000 bytes), in order to avoid "zip-bomb"
// types of attack (where a small compressed input decompresses to a humongous amount
// of data). If the decompressed data exceeds the limit, things proceeds as though the
// channel contained only the first MAX_RLP_BYTES_PER_CHANNEL decompressed bytes."
let mut decompressed = Vec::new();
Decoder::new(compressed.as_slice())?
let mut buf = Vec::new();
Decoder::new(data)?
.take(MAX_RLP_BYTES_PER_CHANNEL)
.read_to_end(&mut decompressed)
.context("failed to decompress")?;
.read_to_end(&mut buf)?;

Ok(decompressed)
Ok(buf)
}
}

Expand Down Expand Up @@ -311,13 +358,12 @@ impl Frame {
.data()
.split_first()
.context("empty transaction data")?;
ensure!(version == &0, "Invalid version: {}", version);
ensure!(version == &0, "invalid transaction version: {}", version);

let mut frames = Vec::new();
while !rollup_payload.is_empty() {
// From the spec:
// "If any one frame fails to parse, the all frames in the transaction are rejected."
let frame = Frame::decode(&mut rollup_payload).context("invalid frame")?;
let frame = Frame::decode(&mut rollup_payload)
.with_context(|| format!("failed to decode frame {}", frames.len()))?;
frames.push(frame);
}

Expand Down Expand Up @@ -508,23 +554,24 @@ mod tests {
let frame_a = Frame {
channel_id: CHANNEL_ID,
number: 1,
data: b"seven__".to_vec(),
data: vec![202, 73, 81, 4, 0, 28, 73, 4, 62],
is_last: true,
};
let frame_b = Frame {
channel_id: CHANNEL_ID,
number: 0,
data: b"four".to_vec(),
data: vec![120, 156, 243, 72, 205, 201, 201, 87, 8, 207, 47],
..Default::default()
};

let mut channel = new_channel();
channel.add_frame(frame_a).unwrap();
assert_eq!(channel.size, 207);
assert_eq!(channel.size, 209);
assert_eq!(channel.is_ready(), false);
channel.add_frame(frame_b).unwrap();
assert_eq!(channel.size, 411);
assert_eq!(channel.size, 420);
assert_eq!(channel.is_ready(), true);
assert_eq!(channel.decompress().unwrap(), b"Hello World!");
}
}
}
Expand Down
Loading
Loading