diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs
index 409b124a..c6348091 100644
--- a/crates/derive/src/lib.rs
+++ b/crates/derive/src/lib.rs
@@ -11,7 +11,6 @@ pub mod attributes;
pub mod batch;
pub mod block;
pub mod errors;
-pub mod params;
pub mod pipeline;
pub mod sources;
pub mod stages;
diff --git a/crates/derive/src/params.rs b/crates/derive/src/params.rs
deleted file mode 100644
index 21c044a4..00000000
--- a/crates/derive/src/params.rs
+++ /dev/null
@@ -1,16 +0,0 @@
-//! This module contains the parameters and identifying types for the derivation pipeline.
-
-use alloy_primitives::{b256, B256};
-
-/// `keccak256("ConfigUpdate(uint256,uint8,bytes)")`
-pub const CONFIG_UPDATE_TOPIC: B256 =
- b256!("1d2b0bda21d56b8bd12d4f94ebacffdfb35f5e226f84b461103bb8beab6353be");
-
-/// The initial version of the system config event log.
-pub const CONFIG_UPDATE_EVENT_VERSION_0: B256 = B256::ZERO;
-
-/// The maximum size of a channel bank.
-pub const MAX_CHANNEL_BANK_SIZE: usize = 100_000_000;
-
-/// The maximum size of a channel bank after the Fjord Hardfork.
-pub const FJORD_MAX_CHANNEL_BANK_SIZE: usize = 1_000_000_000;
diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs
index 50a2fdee..6e99aad2 100644
--- a/crates/derive/src/stages/channel_bank.rs
+++ b/crates/derive/src/stages/channel_bank.rs
@@ -2,7 +2,6 @@
use crate::{
errors::{PipelineError, PipelineErrorKind, PipelineResult},
- params::MAX_CHANNEL_BANK_SIZE,
stages::ChannelReaderProvider,
traits::{OriginAdvancer, OriginProvider, ResettableStage},
};
@@ -15,6 +14,12 @@ use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, Channel, ChannelId, Frame};
use tracing::{trace, warn};
+/// The maximum size of a channel bank.
+pub(crate) const MAX_CHANNEL_BANK_SIZE: usize = 100_000_000;
+
+/// The maximum size of a channel bank after the Fjord Hardfork.
+pub(crate) const FJORD_MAX_CHANNEL_BANK_SIZE: usize = 1_000_000_000;
+
/// Provides frames for the [ChannelBank] stage.
#[async_trait]
pub trait ChannelBankProvider {
@@ -65,11 +70,17 @@ where
self.channels.iter().fold(0, |acc, (_, c)| acc + c.size())
}
- /// Prunes the Channel bank, until it is below [MAX_CHANNEL_BANK_SIZE].
+ /// Prunes the Channel bank, until it is below the max channel bank size.
/// Prunes from the high-priority channel since it failed to be read.
pub fn prune(&mut self) -> PipelineResult<()> {
let mut total_size = self.size();
- while total_size > MAX_CHANNEL_BANK_SIZE {
+ let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
+ let max_channel_bank_size = if self.cfg.is_fjord_active(origin.timestamp) {
+ FJORD_MAX_CHANNEL_BANK_SIZE
+ } else {
+ MAX_CHANNEL_BANK_SIZE
+ };
+ while total_size > max_channel_bank_size {
let id =
self.channel_queue.pop_front().ok_or(PipelineError::ChannelBankEmpty.crit())?;
let channel = self.channels.remove(&id).ok_or(PipelineError::ChannelNotFound.crit())?;
@@ -374,6 +385,32 @@ mod tests {
assert_eq!(channel_bank.size(), current_size);
}
+ #[test]
+ fn test_ingest_and_prune_channel_bank_fjord() {
+ use alloc::vec::Vec;
+ let mut frames: Vec = new_test_frames(100000);
+ let mock = MockChannelBankProvider::new(vec![]);
+ let cfg = Arc::new(RollupConfig { fjord_time: Some(0), ..Default::default() });
+ let mut channel_bank = ChannelBank::new(cfg, mock);
+ // Ingest frames until the channel bank is full and it stops increasing in size
+ let mut current_size = 0;
+ let next_frame = frames.pop().unwrap();
+ channel_bank.ingest_frame(next_frame).unwrap();
+ while channel_bank.size() > current_size {
+ current_size = channel_bank.size();
+ let next_frame = frames.pop().unwrap();
+ channel_bank.ingest_frame(next_frame).unwrap();
+ assert!(channel_bank.size() <= FJORD_MAX_CHANNEL_BANK_SIZE);
+ }
+ // There should be a bunch of frames leftover
+ assert!(!frames.is_empty());
+ // If we ingest one more frame, the channel bank should prune
+ // and the size should be the same
+ let next_frame = frames.pop().unwrap();
+ channel_bank.ingest_frame(next_frame).unwrap();
+ assert_eq!(channel_bank.size(), current_size);
+ }
+
#[tokio::test]
async fn test_read_empty_channel_bank() {
let frames = new_test_frames(1);
diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs
index 0b64316d..225be637 100644
--- a/crates/derive/src/stages/l1_traversal.rs
+++ b/crates/derive/src/stages/l1_traversal.rs
@@ -146,27 +146,22 @@ impl ResettableStage for L1Traversal {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
- use crate::{
- errors::PipelineErrorKind,
- params::{CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC},
- traits::test_utils::TestChainProvider,
- };
+ use crate::{errors::PipelineErrorKind, traits::test_utils::TestChainProvider};
use alloc::vec;
use alloy_consensus::Receipt;
use alloy_primitives::{address, b256, hex, Bytes, Log, LogData, B256};
+ use op_alloy_genesis::system::{CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC};
const L1_SYS_CONFIG_ADDR: Address = address!("1337000000000000000000000000000000000000");
fn new_update_batcher_log() -> Log {
- const UPDATE_TYPE: B256 =
- b256!("0000000000000000000000000000000000000000000000000000000000000000");
Log {
address: L1_SYS_CONFIG_ADDR,
data: LogData::new_unchecked(
vec![
CONFIG_UPDATE_TOPIC,
CONFIG_UPDATE_EVENT_VERSION_0,
- UPDATE_TYPE,
+ B256::ZERO, // Update type
],
hex!("00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000beef").into()
)