Skip to content

Commit

Permalink
fix: derive pipeline params (#587)
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell authored Sep 30, 2024
1 parent 3b5499c commit 391da87
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 28 deletions.
1 change: 0 additions & 1 deletion crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pub mod attributes;
pub mod batch;
pub mod block;
pub mod errors;
pub mod params;
pub mod pipeline;
pub mod sources;
pub mod stages;
Expand Down
16 changes: 0 additions & 16 deletions crates/derive/src/params.rs

This file was deleted.

43 changes: 40 additions & 3 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use crate::{
errors::{PipelineError, PipelineErrorKind, PipelineResult},
params::MAX_CHANNEL_BANK_SIZE,
stages::ChannelReaderProvider,
traits::{OriginAdvancer, OriginProvider, ResettableStage},
};
Expand All @@ -15,6 +14,12 @@ use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, Channel, ChannelId, Frame};
use tracing::{trace, warn};

/// The maximum size of a channel bank.
pub(crate) const MAX_CHANNEL_BANK_SIZE: usize = 100_000_000;

/// The maximum size of a channel bank after the Fjord Hardfork.
pub(crate) const FJORD_MAX_CHANNEL_BANK_SIZE: usize = 1_000_000_000;

/// Provides frames for the [ChannelBank] stage.
#[async_trait]
pub trait ChannelBankProvider {
Expand Down Expand Up @@ -65,11 +70,17 @@ where
self.channels.iter().fold(0, |acc, (_, c)| acc + c.size())
}

/// Prunes the Channel bank, until it is below [MAX_CHANNEL_BANK_SIZE].
/// Prunes the Channel bank, until it is below the max channel bank size.
/// Prunes from the high-priority channel since it failed to be read.
pub fn prune(&mut self) -> PipelineResult<()> {
let mut total_size = self.size();
while total_size > MAX_CHANNEL_BANK_SIZE {
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
let max_channel_bank_size = if self.cfg.is_fjord_active(origin.timestamp) {
FJORD_MAX_CHANNEL_BANK_SIZE
} else {
MAX_CHANNEL_BANK_SIZE
};
while total_size > max_channel_bank_size {
let id =
self.channel_queue.pop_front().ok_or(PipelineError::ChannelBankEmpty.crit())?;
let channel = self.channels.remove(&id).ok_or(PipelineError::ChannelNotFound.crit())?;
Expand Down Expand Up @@ -374,6 +385,32 @@ mod tests {
assert_eq!(channel_bank.size(), current_size);
}

#[test]
fn test_ingest_and_prune_channel_bank_fjord() {
use alloc::vec::Vec;
let mut frames: Vec<Frame> = new_test_frames(100000);
let mock = MockChannelBankProvider::new(vec![]);
let cfg = Arc::new(RollupConfig { fjord_time: Some(0), ..Default::default() });
let mut channel_bank = ChannelBank::new(cfg, mock);
// Ingest frames until the channel bank is full and it stops increasing in size
let mut current_size = 0;
let next_frame = frames.pop().unwrap();
channel_bank.ingest_frame(next_frame).unwrap();
while channel_bank.size() > current_size {
current_size = channel_bank.size();
let next_frame = frames.pop().unwrap();
channel_bank.ingest_frame(next_frame).unwrap();
assert!(channel_bank.size() <= FJORD_MAX_CHANNEL_BANK_SIZE);
}
// There should be a bunch of frames leftover
assert!(!frames.is_empty());
// If we ingest one more frame, the channel bank should prune
// and the size should be the same
let next_frame = frames.pop().unwrap();
channel_bank.ingest_frame(next_frame).unwrap();
assert_eq!(channel_bank.size(), current_size);
}

#[tokio::test]
async fn test_read_empty_channel_bank() {
let frames = new_test_frames(1);
Expand Down
11 changes: 3 additions & 8 deletions crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,27 +146,22 @@ impl<F: ChainProvider + Send> ResettableStage for L1Traversal<F> {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::{
errors::PipelineErrorKind,
params::{CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC},
traits::test_utils::TestChainProvider,
};
use crate::{errors::PipelineErrorKind, traits::test_utils::TestChainProvider};
use alloc::vec;
use alloy_consensus::Receipt;
use alloy_primitives::{address, b256, hex, Bytes, Log, LogData, B256};
use op_alloy_genesis::system::{CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC};

const L1_SYS_CONFIG_ADDR: Address = address!("1337000000000000000000000000000000000000");

fn new_update_batcher_log() -> Log {
const UPDATE_TYPE: B256 =
b256!("0000000000000000000000000000000000000000000000000000000000000000");
Log {
address: L1_SYS_CONFIG_ADDR,
data: LogData::new_unchecked(
vec![
CONFIG_UPDATE_TOPIC,
CONFIG_UPDATE_EVENT_VERSION_0,
UPDATE_TYPE,
B256::ZERO, // Update type
],
hex!("00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000beef").into()
)
Expand Down

0 comments on commit 391da87

Please sign in to comment.