From 391da87f1a11ccbc7f43514146433a6928d1129a Mon Sep 17 00:00:00 2001 From: refcell Date: Mon, 30 Sep 2024 11:36:16 -0400 Subject: [PATCH] fix: derive pipeline params (#587) --- crates/derive/src/lib.rs | 1 - crates/derive/src/params.rs | 16 --------- crates/derive/src/stages/channel_bank.rs | 43 ++++++++++++++++++++++-- crates/derive/src/stages/l1_traversal.rs | 11 ++---- 4 files changed, 43 insertions(+), 28 deletions(-) delete mode 100644 crates/derive/src/params.rs diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index 409b124a..c6348091 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -11,7 +11,6 @@ pub mod attributes; pub mod batch; pub mod block; pub mod errors; -pub mod params; pub mod pipeline; pub mod sources; pub mod stages; diff --git a/crates/derive/src/params.rs b/crates/derive/src/params.rs deleted file mode 100644 index 21c044a4..00000000 --- a/crates/derive/src/params.rs +++ /dev/null @@ -1,16 +0,0 @@ -//! This module contains the parameters and identifying types for the derivation pipeline. - -use alloy_primitives::{b256, B256}; - -/// `keccak256("ConfigUpdate(uint256,uint8,bytes)")` -pub const CONFIG_UPDATE_TOPIC: B256 = - b256!("1d2b0bda21d56b8bd12d4f94ebacffdfb35f5e226f84b461103bb8beab6353be"); - -/// The initial version of the system config event log. -pub const CONFIG_UPDATE_EVENT_VERSION_0: B256 = B256::ZERO; - -/// The maximum size of a channel bank. -pub const MAX_CHANNEL_BANK_SIZE: usize = 100_000_000; - -/// The maximum size of a channel bank after the Fjord Hardfork. -pub const FJORD_MAX_CHANNEL_BANK_SIZE: usize = 1_000_000_000; diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 50a2fdee..6e99aad2 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -2,7 +2,6 @@ use crate::{ errors::{PipelineError, PipelineErrorKind, PipelineResult}, - params::MAX_CHANNEL_BANK_SIZE, stages::ChannelReaderProvider, traits::{OriginAdvancer, OriginProvider, ResettableStage}, }; @@ -15,6 +14,12 @@ use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::{BlockInfo, Channel, ChannelId, Frame}; use tracing::{trace, warn}; +/// The maximum size of a channel bank. +pub(crate) const MAX_CHANNEL_BANK_SIZE: usize = 100_000_000; + +/// The maximum size of a channel bank after the Fjord Hardfork. +pub(crate) const FJORD_MAX_CHANNEL_BANK_SIZE: usize = 1_000_000_000; + /// Provides frames for the [ChannelBank] stage. #[async_trait] pub trait ChannelBankProvider { @@ -65,11 +70,17 @@ where self.channels.iter().fold(0, |acc, (_, c)| acc + c.size()) } - /// Prunes the Channel bank, until it is below [MAX_CHANNEL_BANK_SIZE]. + /// Prunes the Channel bank, until it is below the max channel bank size. /// Prunes from the high-priority channel since it failed to be read. pub fn prune(&mut self) -> PipelineResult<()> { let mut total_size = self.size(); - while total_size > MAX_CHANNEL_BANK_SIZE { + let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; + let max_channel_bank_size = if self.cfg.is_fjord_active(origin.timestamp) { + FJORD_MAX_CHANNEL_BANK_SIZE + } else { + MAX_CHANNEL_BANK_SIZE + }; + while total_size > max_channel_bank_size { let id = self.channel_queue.pop_front().ok_or(PipelineError::ChannelBankEmpty.crit())?; let channel = self.channels.remove(&id).ok_or(PipelineError::ChannelNotFound.crit())?; @@ -374,6 +385,32 @@ mod tests { assert_eq!(channel_bank.size(), current_size); } + #[test] + fn test_ingest_and_prune_channel_bank_fjord() { + use alloc::vec::Vec; + let mut frames: Vec = new_test_frames(100000); + let mock = MockChannelBankProvider::new(vec![]); + let cfg = Arc::new(RollupConfig { fjord_time: Some(0), ..Default::default() }); + let mut channel_bank = ChannelBank::new(cfg, mock); + // Ingest frames until the channel bank is full and it stops increasing in size + let mut current_size = 0; + let next_frame = frames.pop().unwrap(); + channel_bank.ingest_frame(next_frame).unwrap(); + while channel_bank.size() > current_size { + current_size = channel_bank.size(); + let next_frame = frames.pop().unwrap(); + channel_bank.ingest_frame(next_frame).unwrap(); + assert!(channel_bank.size() <= FJORD_MAX_CHANNEL_BANK_SIZE); + } + // There should be a bunch of frames leftover + assert!(!frames.is_empty()); + // If we ingest one more frame, the channel bank should prune + // and the size should be the same + let next_frame = frames.pop().unwrap(); + channel_bank.ingest_frame(next_frame).unwrap(); + assert_eq!(channel_bank.size(), current_size); + } + #[tokio::test] async fn test_read_empty_channel_bank() { let frames = new_test_frames(1); diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 0b64316d..225be637 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -146,27 +146,22 @@ impl ResettableStage for L1Traversal { #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::{ - errors::PipelineErrorKind, - params::{CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC}, - traits::test_utils::TestChainProvider, - }; + use crate::{errors::PipelineErrorKind, traits::test_utils::TestChainProvider}; use alloc::vec; use alloy_consensus::Receipt; use alloy_primitives::{address, b256, hex, Bytes, Log, LogData, B256}; + use op_alloy_genesis::system::{CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC}; const L1_SYS_CONFIG_ADDR: Address = address!("1337000000000000000000000000000000000000"); fn new_update_batcher_log() -> Log { - const UPDATE_TYPE: B256 = - b256!("0000000000000000000000000000000000000000000000000000000000000000"); Log { address: L1_SYS_CONFIG_ADDR, data: LogData::new_unchecked( vec![ CONFIG_UPDATE_TOPIC, CONFIG_UPDATE_EVENT_VERSION_0, - UPDATE_TYPE, + B256::ZERO, // Update type ], hex!("00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000beef").into() )