From 4050a5c323e15677eafc626402b4b094573d00be Mon Sep 17 00:00:00 2001 From: clabby Date: Wed, 16 Oct 2024 20:38:07 -0400 Subject: [PATCH] feat(derive): Add `ChannelAssembler` size limitation --- .../src/stages/channel/channel_assembler.rs | 86 ++++++++++++++++++- 1 file changed, 84 insertions(+), 2 deletions(-) diff --git a/crates/derive/src/stages/channel/channel_assembler.rs b/crates/derive/src/stages/channel/channel_assembler.rs index b727baa3..609ec7c7 100644 --- a/crates/derive/src/stages/channel/channel_assembler.rs +++ b/crates/derive/src/stages/channel/channel_assembler.rs @@ -9,7 +9,9 @@ use alloc::{boxed::Box, sync::Arc}; use alloy_primitives::{hex, Bytes}; use async_trait::async_trait; use core::fmt::Debug; -use op_alloy_genesis::RollupConfig; +use op_alloy_genesis::{ + RollupConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD, +}; use op_alloy_protocol::{BlockInfo, Channel}; use tracing::{debug, error, info, warn}; @@ -124,6 +126,22 @@ where return Err(PipelineError::NotEnoughData.temp()); } + let max_rlp_bytes_per_channel = if self.cfg.is_fjord_active(origin.timestamp) { + MAX_RLP_BYTES_PER_CHANNEL_FJORD + } else { + MAX_RLP_BYTES_PER_CHANNEL_BEDROCK + }; + if channel.size() > max_rlp_bytes_per_channel as usize { + warn!( + target: "channel-assembler", + "Compressed channel size exceeded max RLP bytes per channel, dropping channel (ID: {}) with {} bytes", + hex::encode(channel.id()), + channel.size() + ); + self.channel = None; + return Err(PipelineError::NotEnoughData.temp()); + } + // If the channel is ready, forward the channel to the next stage. if channel.is_ready() { let channel_bytes = @@ -186,7 +204,9 @@ mod test { test_utils::{CollectingLayer, TestNextFrameProvider, TraceStorage}, }; use alloc::sync::Arc; - use op_alloy_genesis::RollupConfig; + use op_alloy_genesis::{ + RollupConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD, + }; use op_alloy_protocol::BlockInfo; use tracing::Level; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -281,4 +301,66 @@ mod test { let error_str = "Failed to add frame to channel"; assert!(error_logs[0].contains(error_str)); } + + #[tokio::test] + async fn test_assembler_size_limit_exceeded_bedrock() { + let trace_store: TraceStorage = Default::default(); + let layer = CollectingLayer::new(trace_store.clone()); + tracing_subscriber::Registry::default().with(layer).init(); + + let mut frames = new_test_frames(2); + frames[1].data = vec![0; MAX_RLP_BYTES_PER_CHANNEL_BEDROCK as usize]; + let mock = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect()); + let cfg = Arc::new(RollupConfig::default()); + + let mut assembler = ChannelAssembler::new(cfg, mock); + + // Send in the first frame. This should result in a channel being created. + assert!(assembler.channel.is_none()); + assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp()); + assert!(assembler.channel.is_some()); + + // Send in the second frame. This should result in the channel being dropped due to the size + // limit being reached. + assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp()); + assert!(assembler.channel.is_none()); + + let trace_store_lock = trace_store.lock(); + assert_eq!(trace_store_lock.iter().filter(|(l, _)| matches!(l, &Level::WARN)).count(), 1); + + let (_, message) = + trace_store_lock.iter().find(|(l, _)| matches!(l, &Level::WARN)).unwrap(); + assert!(message.contains("Compressed channel size exceeded max RLP bytes per channel")); + } + + #[tokio::test] + async fn test_assembler_size_limit_exceeded_fjord() { + let trace_store: TraceStorage = Default::default(); + let layer = CollectingLayer::new(trace_store.clone()); + tracing_subscriber::Registry::default().with(layer).init(); + + let mut frames = new_test_frames(2); + frames[1].data = vec![0; MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize]; + let mock = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect()); + let cfg = Arc::new(RollupConfig { fjord_time: Some(0), ..Default::default() }); + + let mut assembler = ChannelAssembler::new(cfg, mock); + + // Send in the first frame. This should result in a channel being created. + assert!(assembler.channel.is_none()); + assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp()); + assert!(assembler.channel.is_some()); + + // Send in the second frame. This should result in the channel being dropped due to the size + // limit being reached. + assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp()); + assert!(assembler.channel.is_none()); + + let trace_store_lock = trace_store.lock(); + assert_eq!(trace_store_lock.iter().filter(|(l, _)| matches!(l, &Level::WARN)).count(), 1); + + let (_, message) = + trace_store_lock.iter().find(|(l, _)| matches!(l, &Level::WARN)).unwrap(); + assert!(message.contains("Compressed channel size exceeded max RLP bytes per channel")); + } }