From fbe42559561f98d313a8ab91f7777188c958d1ab Mon Sep 17 00:00:00 2001 From: Wolfgang Welz Date: Fri, 22 Dec 2023 15:52:52 +0100 Subject: [PATCH] Fix channel bank handling in Optimism derivation (#71) * fix channel bank * channel_index is not mut --- lib/src/lib.rs | 2 + lib/src/optimism/batcher.rs | 16 ++- lib/src/optimism/batcher_channel.rs | 213 +++++++++++++++++----------- lib/src/utils.rs | 63 ++++++++ 4 files changed, 204 insertions(+), 90 deletions(-) create mode 100644 lib/src/utils.rs diff --git a/lib/src/lib.rs b/lib/src/lib.rs index ab0bd361..633ebec8 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -23,6 +23,8 @@ pub mod input; pub mod mem_db; pub mod optimism; +mod utils; + pub use zeth_primitives::transactions::{ethereum::EthereumTxEssence, optimism::OptimismTxEssence}; /// call forget only if running inside the guest diff --git a/lib/src/optimism/batcher.rs b/lib/src/optimism/batcher.rs index c0eabe1a..266b0ab4 100644 --- a/lib/src/optimism/batcher.rs +++ b/lib/src/optimism/batcher.rs @@ -179,19 +179,21 @@ impl Batcher { deposits: deposits::extract_transactions(&self.config, eth_block)?, })?; - // Read frames into channels - self.batcher_channel.process_l1_transactions( - self.config.system_config.batch_sender, - eth_block.block_header.number, - ð_block.transactions, - )?; + // process all transactions of this block to generate batches + self.batcher_channel + .process_l1_transactions( + self.config.system_config.batch_sender, + eth_block.block_header.number, + ð_block.transactions, + ) + .context("failed to process transactions")?; // Read batches while let Some(batches) = self.batcher_channel.read_batches() { batches.into_iter().for_each(|batch| { #[cfg(not(target_os = "zkvm"))] log::debug!( - "saw batch: t={}, ph={:?}, e={}", + "received batch: timestamp={}, parent_hash={}, epoch={}", batch.essence.timestamp, batch.essence.parent_hash, batch.essence.epoch_num diff --git a/lib/src/optimism/batcher_channel.rs b/lib/src/optimism/batcher_channel.rs index 05838d1b..4770670e 100644 --- a/lib/src/optimism/batcher_channel.rs +++ b/lib/src/optimism/batcher_channel.rs @@ -28,6 +28,7 @@ use zeth_primitives::{ }; use super::config::ChainConfig; +use crate::utils::MultiReader; pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000; @@ -50,12 +51,15 @@ impl BatcherChannels { } } + /// Processes all batcher transactions in the given block. + /// The given batch_sender must match the potentially updated batcher address loaded + /// from the system config. pub fn process_l1_transactions( &mut self, batch_sender: Address, block_number: BlockNumber, transactions: &Vec>, - ) -> anyhow::Result<()> { + ) -> Result<()> { for tx in transactions { // From the spec: // "The receiver must be the configured batcher inbox address." @@ -65,76 +69,57 @@ impl BatcherChannels { // From the spec: // "The sender must match the batcher address loaded from the system config matching // the L1 block of the data." - if tx.recover_from()? != batch_sender { + if tx.recover_from().context("invalid signature")? != batch_sender { continue; } #[cfg(not(target_os = "zkvm"))] log::debug!("received batcher tx: {}", tx.hash()); - for frame in Frame::process_batcher_transaction(&tx.essence)? { + // From the spec: + // "If any one frame fails to parse, the all frames in the transaction are rejected." + let frames = match Frame::process_batcher_transaction(&tx.essence) { + Ok(frames) => frames, + Err(_err) => { + #[cfg(not(target_os = "zkvm"))] + log::warn!( + "failed to decode all frames; skip entire batcher tx: {:#}", + _err + ); + continue; + } + }; + + // load received frames into the channel bank + for frame in frames { #[cfg(not(target_os = "zkvm"))] log::debug!( - "received frame: channel_id: {}, frame_number: {}, is_last: {}", + "received frame: channel_id={}, frame_number={}, is_last={}", frame.channel_id, frame.number, frame.is_last ); - let frame_channel_id = frame.channel_id; - - // Send the frame to its corresponding channel - { - if let Some(channel_index) = self.channel_index(frame_channel_id) { - let channel = &mut self.channels[channel_index]; - - // Enforce channel_timeout - if block_number > channel.open_l1_block + self.channel_timeout { - // Remove the channel. From the spec: - // "New frames for timed-out channels are dropped instead of buffered." - self.channels.remove(channel_index); - } else { - // Add frame to channel - channel.add_frame(frame).context("failed to add frame")?; - } - } else { - // Create new channel. From the spec: - // "When a channel ID referenced by a frame is not already present in the - // Channel Bank, a new channel is opened, tagged with the current L1 - // block, and appended to the channel-queue" - self.channels.push_back(Channel::new(block_number, frame)); - } - } - - // Enforce max_channel_bank_size. From the spec: - // "After successfully inserting a new frame, the ChannelBank is pruned: channels - // are dropped in FIFO order, until total_size <= MAX_CHANNEL_BANK_SIZE." - { - let mut total_size = self.total_size(); - while total_size as u64 > self.max_channel_bank_size { - let dropped_channel = self.channels.pop_front().unwrap(); - total_size -= dropped_channel.size; - - #[cfg(not(target_os = "zkvm"))] - log::debug!( - "pruned channel: {} (channel_size: {})", - dropped_channel.id, - dropped_channel.size - ); - } - } + self.add_frame(block_number, frame); + } - // Decode batches from channel (if ready) - if let Some(channel_index) = self.channel_index(frame_channel_id) { - if self.channels[channel_index].is_ready() { - let channel = self.channels.remove(channel_index).unwrap(); + // Remove all timed-out channels at the front of the queue. From the spec: + // "Upon reading, while the first opened channel is timed-out, remove it from the + // channel-bank." + while matches!(self.channels.front(), Some(channel) if block_number > channel.open_l1_block + self.channel_timeout) + { + let _channel = self.channels.pop_front().unwrap(); + #[cfg(not(target_os = "zkvm"))] + log::debug!("timed-out channel: {}", _channel.id); + } - #[cfg(not(target_os = "zkvm"))] - log::debug!("received channel: {}", channel.id); + // read all ready channels from the front of the queue + while matches!(self.channels.front(), Some(channel) if channel.is_ready()) { + let channel = self.channels.pop_front().unwrap(); + #[cfg(not(target_os = "zkvm"))] + log::debug!("received channel: {}", channel.id); - self.batches.push_back(channel.read_batches(block_number)?); - } - } + self.batches.push_back(channel.read_batches(block_number)); } } @@ -145,6 +130,57 @@ impl BatcherChannels { self.batches.pop_front() } + /// Adds a frame to the channel bank. Frames that cannot be added are ignored. + fn add_frame(&mut self, block_number: BlockNumber, frame: Frame) { + let channel = self + .channel_index(frame.channel_id) + .and_then(|idx| self.channels.get_mut(idx)); + + match channel { + Some(channel) => { + if block_number > channel.open_l1_block + self.channel_timeout { + // From the spec: + // "New frames for timed-out channels are dropped instead of buffered." + #[cfg(not(target_os = "zkvm"))] + log::warn!("frame's channel is timed out; ignored"); + return; + } else if let Err(_err) = channel.add_frame(frame) { + #[cfg(not(target_os = "zkvm"))] + log::warn!("failed to add frame to channel; ignored: {:#}", _err); + return; + } + } + None => { + // Create new channel. From the spec: + // "When a channel ID referenced by a frame is not already present in the + // Channel Bank, a new channel is opened, tagged with the current L1 + // block, and appended to the channel-queue" + self.channels.push_back(Channel::new(block_number, frame)); + } + } + + // From the spec: + // "After successfully inserting a new frame, the ChannelBank is pruned: channels + // are dropped in FIFO order, until total_size <= MAX_CHANNEL_BANK_SIZE." + self.prune(); + } + + /// Enforces max_channel_bank_size by dropping channels in FIFO order. + fn prune(&mut self) { + let mut total_size = self.total_size(); + while total_size as u64 > self.max_channel_bank_size { + let dropped_channel = self.channels.pop_front().unwrap(); + total_size -= dropped_channel.size; + + #[cfg(not(target_os = "zkvm"))] + log::debug!( + "pruned channel: {} (channel_size: {})", + dropped_channel.id, + dropped_channel.size + ); + } + } + fn total_size(&self) -> usize { self.channels.iter().map(|c| c.size).sum() } @@ -244,31 +280,43 @@ impl Channel { Ok(()) } - fn read_batches(&self, l1_block_number: BlockNumber) -> Result> { - let decompressed = self.decompress()?; - let mut channel_data = decompressed.as_slice(); + /// Reads all batches from an ready channel. If there is an invalid batch, the rest of + /// the channel is skipped, but previous batches are returned. + fn read_batches(&self, block_number: BlockNumber) -> Vec { + debug_assert!(self.is_ready()); + let mut batches = Vec::new(); + if let Err(_err) = self.decode_batches(block_number, &mut batches) { + #[cfg(not(target_os = "zkvm"))] + log::warn!( + "failed to decode all batches; skipping rest of channel: {:#}", + _err + ); + } + + batches + } + fn decode_batches(&self, block_number: BlockNumber, batches: &mut Vec) -> Result<()> { + let decompressed = self + .decompress() + .context("failed to decompress channel data")?; + + let mut channel_data = decompressed.as_slice(); while !channel_data.is_empty() { - let mut batch = - Batch::decode(&mut channel_data).context("failed to decode batch data")?; - batch.inclusion_block_number = l1_block_number; + let mut batch = Batch::decode(&mut channel_data) + .with_context(|| format!("failed to decode batch {}", batches.len()))?; + batch.inclusion_block_number = block_number; batches.push(batch); } - Ok(batches) + Ok(()) } fn decompress(&self) -> Result> { - let compressed = { - let mut buf = Vec::new(); - for frame in self.frames.values() { - buf.extend(&frame.data) - } - - buf - }; + // chain all frames' data together + let data = MultiReader::new(self.frames.values().map(|frame| frame.data.as_slice())); // From the spec: // "When decompressing a channel, we limit the amount of decompressed data to @@ -276,13 +324,12 @@ impl Channel { // types of attack (where a small compressed input decompresses to a humongous amount // of data). If the decompressed data exceeds the limit, things proceeds as though the // channel contained only the first MAX_RLP_BYTES_PER_CHANNEL decompressed bytes." - let mut decompressed = Vec::new(); - Decoder::new(compressed.as_slice())? + let mut buf = Vec::new(); + Decoder::new(data)? .take(MAX_RLP_BYTES_PER_CHANNEL) - .read_to_end(&mut decompressed) - .context("failed to decompress")?; + .read_to_end(&mut buf)?; - Ok(decompressed) + Ok(buf) } } @@ -311,13 +358,12 @@ impl Frame { .data() .split_first() .context("empty transaction data")?; - ensure!(version == &0, "Invalid version: {}", version); + ensure!(version == &0, "invalid transaction version: {}", version); let mut frames = Vec::new(); while !rollup_payload.is_empty() { - // From the spec: - // "If any one frame fails to parse, the all frames in the transaction are rejected." - let frame = Frame::decode(&mut rollup_payload).context("invalid frame")?; + let frame = Frame::decode(&mut rollup_payload) + .with_context(|| format!("failed to decode frame {}", frames.len()))?; frames.push(frame); } @@ -508,23 +554,24 @@ mod tests { let frame_a = Frame { channel_id: CHANNEL_ID, number: 1, - data: b"seven__".to_vec(), + data: vec![202, 73, 81, 4, 0, 28, 73, 4, 62], is_last: true, }; let frame_b = Frame { channel_id: CHANNEL_ID, number: 0, - data: b"four".to_vec(), + data: vec![120, 156, 243, 72, 205, 201, 201, 87, 8, 207, 47], ..Default::default() }; let mut channel = new_channel(); channel.add_frame(frame_a).unwrap(); - assert_eq!(channel.size, 207); + assert_eq!(channel.size, 209); assert_eq!(channel.is_ready(), false); channel.add_frame(frame_b).unwrap(); - assert_eq!(channel.size, 411); + assert_eq!(channel.size, 420); assert_eq!(channel.is_ready(), true); + assert_eq!(channel.decompress().unwrap(), b"Hello World!"); } } } diff --git a/lib/src/utils.rs b/lib/src/utils.rs new file mode 100644 index 00000000..db91e675 --- /dev/null +++ b/lib/src/utils.rs @@ -0,0 +1,63 @@ +// Copyright 2023 RISC Zero, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{io, io::Read}; + +/// An adaptor that chains multiple readers together. +pub struct MultiReader { + readers: I, + current: Option, +} + +impl MultiReader +where + I: IntoIterator, + R: Read, +{ + /// Creates a new instance of `MultiReader`. + /// + /// This function takes an iterator over readers and returns a `MultiReader`. + pub fn new(readers: I) -> MultiReader { + let mut readers = readers.into_iter(); + let current = readers.next(); + MultiReader { readers, current } + } +} + +/// Implementation of the `Read` trait for `MultiReader`. +impl Read for MultiReader +where + I: Iterator, + R: Read, +{ + /// Reads data from the current reader into a buffer. + /// + /// This function reads as much data as possible from the current reader into the + /// buffer, and switches to the next reader when the current one is exhausted. + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + loop { + match self.current { + Some(ref mut r) => { + let n = r.read(buf)?; + if n > 0 { + return Ok(n); + } + } + None => return Ok(0), + } + self.current = self.readers.next(); + } + } +}