Skip to content

Commit

Permalink
feat(derive): Add ChannelAssembler size limitation
Browse files Browse the repository at this point in the history
  • Loading branch information
clabby committed Oct 17, 2024
1 parent e884f6e commit 4050a5c
Showing 1 changed file with 84 additions and 2 deletions.
86 changes: 84 additions & 2 deletions crates/derive/src/stages/channel/channel_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use alloc::{boxed::Box, sync::Arc};
use alloy_primitives::{hex, Bytes};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::RollupConfig;
use op_alloy_genesis::{
RollupConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD,
};
use op_alloy_protocol::{BlockInfo, Channel};
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -124,6 +126,22 @@ where
return Err(PipelineError::NotEnoughData.temp());
}

let max_rlp_bytes_per_channel = if self.cfg.is_fjord_active(origin.timestamp) {
MAX_RLP_BYTES_PER_CHANNEL_FJORD
} else {
MAX_RLP_BYTES_PER_CHANNEL_BEDROCK
};
if channel.size() > max_rlp_bytes_per_channel as usize {
warn!(
target: "channel-assembler",
"Compressed channel size exceeded max RLP bytes per channel, dropping channel (ID: {}) with {} bytes",
hex::encode(channel.id()),
channel.size()
);
self.channel = None;
return Err(PipelineError::NotEnoughData.temp());
}

// If the channel is ready, forward the channel to the next stage.
if channel.is_ready() {
let channel_bytes =
Expand Down Expand Up @@ -186,7 +204,9 @@ mod test {
test_utils::{CollectingLayer, TestNextFrameProvider, TraceStorage},
};
use alloc::sync::Arc;
use op_alloy_genesis::RollupConfig;
use op_alloy_genesis::{
RollupConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD,
};
use op_alloy_protocol::BlockInfo;
use tracing::Level;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand Down Expand Up @@ -281,4 +301,66 @@ mod test {
let error_str = "Failed to add frame to channel";
assert!(error_logs[0].contains(error_str));
}

#[tokio::test]
async fn test_assembler_size_limit_exceeded_bedrock() {
let trace_store: TraceStorage = Default::default();
let layer = CollectingLayer::new(trace_store.clone());
tracing_subscriber::Registry::default().with(layer).init();

let mut frames = new_test_frames(2);
frames[1].data = vec![0; MAX_RLP_BYTES_PER_CHANNEL_BEDROCK as usize];
let mock = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect());
let cfg = Arc::new(RollupConfig::default());

let mut assembler = ChannelAssembler::new(cfg, mock);

// Send in the first frame. This should result in a channel being created.
assert!(assembler.channel.is_none());
assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp());
assert!(assembler.channel.is_some());

// Send in the second frame. This should result in the channel being dropped due to the size
// limit being reached.
assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp());
assert!(assembler.channel.is_none());

let trace_store_lock = trace_store.lock();
assert_eq!(trace_store_lock.iter().filter(|(l, _)| matches!(l, &Level::WARN)).count(), 1);

let (_, message) =
trace_store_lock.iter().find(|(l, _)| matches!(l, &Level::WARN)).unwrap();
assert!(message.contains("Compressed channel size exceeded max RLP bytes per channel"));
}

#[tokio::test]
async fn test_assembler_size_limit_exceeded_fjord() {
let trace_store: TraceStorage = Default::default();
let layer = CollectingLayer::new(trace_store.clone());
tracing_subscriber::Registry::default().with(layer).init();

let mut frames = new_test_frames(2);
frames[1].data = vec![0; MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize];
let mock = TestNextFrameProvider::new(frames.into_iter().rev().map(Ok).collect());
let cfg = Arc::new(RollupConfig { fjord_time: Some(0), ..Default::default() });

let mut assembler = ChannelAssembler::new(cfg, mock);

// Send in the first frame. This should result in a channel being created.
assert!(assembler.channel.is_none());
assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp());
assert!(assembler.channel.is_some());

// Send in the second frame. This should result in the channel being dropped due to the size
// limit being reached.
assert_eq!(assembler.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp());
assert!(assembler.channel.is_none());

let trace_store_lock = trace_store.lock();
assert_eq!(trace_store_lock.iter().filter(|(l, _)| matches!(l, &Level::WARN)).count(), 1);

let (_, message) =
trace_store_lock.iter().find(|(l, _)| matches!(l, &Level::WARN)).unwrap();
assert!(message.contains("Compressed channel size exceeded max RLP bytes per channel"));
}
}

0 comments on commit 4050a5c

Please sign in to comment.