diff --git a/crates/derive/src/batch/mod.rs b/crates/derive/src/batch/mod.rs index 790b7909..38b654bb 100644 --- a/crates/derive/src/batch/mod.rs +++ b/crates/derive/src/batch/mod.rs @@ -19,8 +19,7 @@ pub use span_batch::{ RawSpanBatch, SpanBatch, SpanBatchBits, SpanBatchEip1559TransactionData, SpanBatchEip2930TransactionData, SpanBatchElement, SpanBatchError, SpanBatchLegacyTransactionData, SpanBatchPayload, SpanBatchPrefix, SpanBatchSignature, - SpanBatchTransactionData, SpanBatchTransactions, SpanDecodingError, FJORD_MAX_SPAN_BATCH_BYTES, - MAX_SPAN_BATCH_BYTES, + SpanBatchTransactionData, SpanBatchTransactions, SpanDecodingError, MAX_SPAN_BATCH_ELEMENTS, }; mod single_batch; @@ -102,7 +101,7 @@ impl Batch { Ok(Self::Single(single_batch)) } BatchType::Span => { - let mut raw_span_batch = RawSpanBatch::decode(r, cfg)?; + let mut raw_span_batch = RawSpanBatch::decode(r)?; let span_batch = raw_span_batch .derive(cfg.block_time, cfg.genesis.l2_time, cfg.l2_chain_id) .map_err(PipelineEncodingError::SpanBatchError)?; diff --git a/crates/derive/src/batch/span_batch/bits.rs b/crates/derive/src/batch/span_batch/bits.rs index 94f7859e..7f8fd265 100644 --- a/crates/derive/src/batch/span_batch/bits.rs +++ b/crates/derive/src/batch/span_batch/bits.rs @@ -1,6 +1,6 @@ //! Module for working with span batch bits. -use super::{errors::SpanBatchError, FJORD_MAX_SPAN_BATCH_BYTES, MAX_SPAN_BATCH_BYTES}; +use super::errors::SpanBatchError; use alloc::{vec, vec::Vec}; use alloy_rlp::Buf; use core::cmp::Ordering; @@ -16,29 +16,11 @@ impl AsRef<[u8]> for SpanBatchBits { } impl SpanBatchBits { - /// Returns the max amount of bytes that can be stored in the bitlist. - pub const fn max_bytes(is_fjord_active: bool) -> usize { - if is_fjord_active { - FJORD_MAX_SPAN_BATCH_BYTES as usize - } else { - MAX_SPAN_BATCH_BYTES as usize - } - } - /// Decodes a standard span-batch bitlist from a reader. /// The bitlist is encoded as big-endian integer, left-padded with zeroes to a multiple of 8 - /// bits. The encoded bitlist cannot be longer than [MAX_SPAN_BATCH_BYTES]. - pub fn decode( - b: &mut &[u8], - bit_length: usize, - is_fjord_active: bool, - ) -> Result { + /// bits. The encoded bitlist cannot be longer than `bit_length`. + pub fn decode(b: &mut &[u8], bit_length: usize) -> Result { let buffer_len = bit_length / 8 + if bit_length % 8 != 0 { 1 } else { 0 }; - let max_bytes = Self::max_bytes(is_fjord_active); - if buffer_len > max_bytes { - return Err(SpanBatchError::TooBigSpanBatchSize); - } - let bits = if b.len() < buffer_len { let mut bits = vec![0; buffer_len]; bits[..b.len()].copy_from_slice(b); @@ -60,14 +42,8 @@ impl SpanBatchBits { /// Encodes a standard span-batch bitlist. /// The bitlist is encoded as big-endian integer, left-padded with zeroes to a multiple of 8 - /// bits. The encoded bitlist cannot be longer than [MAX_SPAN_BATCH_BYTES] or - /// [FJORD_MAX_SPAN_BATCH_BYTES] if fjord is active. - pub fn encode( - w: &mut Vec, - bit_length: usize, - bits: &Self, - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { + /// bits. The encoded bitlist cannot be longer than `bit_length` + pub fn encode(w: &mut Vec, bit_length: usize, bits: &Self) -> Result<(), SpanBatchError> { if bits.bit_len() > bit_length { return Err(SpanBatchError::BitfieldTooLong); } @@ -75,10 +51,6 @@ impl SpanBatchBits { // Round up, ensure enough bytes when number of bits is not a multiple of 8. // Alternative of (L+7)/8 is not overflow-safe. let buf_len = bit_length / 8 + if bit_length % 8 != 0 { 1 } else { 0 }; - let max_bytes = Self::max_bytes(is_fjord_active); - if buf_len > max_bytes { - return Err(SpanBatchError::TooBigSpanBatchSize); - } let mut buf = vec![0; buf_len]; buf[buf_len - bits.0.len()..].copy_from_slice(bits.as_ref()); w.extend_from_slice(&buf); @@ -174,19 +146,13 @@ mod test { use super::*; use proptest::{collection::vec, prelude::any, proptest}; - #[test] - fn test_bitlist_max_bytes() { - assert_eq!(SpanBatchBits::max_bytes(false), MAX_SPAN_BATCH_BYTES as usize); - assert_eq!(SpanBatchBits::max_bytes(true), FJORD_MAX_SPAN_BATCH_BYTES as usize); - } - proptest! { #[test] fn test_encode_decode_roundtrip_span_bitlist(vec in vec(any::(), 0..5096)) { let bits = SpanBatchBits(vec); - assert_eq!(SpanBatchBits::decode(&mut bits.as_ref(), bits.0.len() * 8, false).unwrap(), bits); + assert_eq!(SpanBatchBits::decode(&mut bits.as_ref(), bits.0.len() * 8).unwrap(), bits); let mut encoded = Vec::new(); - SpanBatchBits::encode(&mut encoded, bits.0.len() * 8, &bits, false).unwrap(); + SpanBatchBits::encode(&mut encoded, bits.0.len() * 8, &bits).unwrap(); assert_eq!(encoded, bits.0); } diff --git a/crates/derive/src/batch/span_batch/mod.rs b/crates/derive/src/batch/span_batch/mod.rs index b71312be..b32afb19 100644 --- a/crates/derive/src/batch/span_batch/mod.rs +++ b/crates/derive/src/batch/span_batch/mod.rs @@ -10,19 +10,9 @@ //! txs = contract_creation_bits ++ y_parity_bits ++ tx_sigs ++ tx_tos ++ tx_datas ++ tx_nonces ++ tx_gases ++ protected_bits //! ``` -/// [MAX_SPAN_BATCH_BYTES] is the maximum amount of bytes that will be needed -/// to decode every span batch field. -/// -/// This value cannot be larger than MaxRLPBytesPerChannel because single batch cannot be larger -/// than channel size. -pub const MAX_SPAN_BATCH_BYTES: u64 = op_alloy_protocol::MAX_RLP_BYTES_PER_CHANNEL; - -/// [FJORD_MAX_SPAN_BATCH_BYTES] is the maximum amount of bytes that will be needed -/// to decode every span batch field after the Fjord Hardfork. -/// -/// This value cannot be larger than MaxRLPBytesPerChannel because single batch -/// cannot be larger than channel size. -pub const FJORD_MAX_SPAN_BATCH_BYTES: u64 = op_alloy_protocol::FJORD_MAX_RLP_BYTES_PER_CHANNEL; +/// MAX_SPAN_BATCH_ELEMENTS is the maximum number of blocks, transactions in total, +/// or transaction per block allowed in a span batch. +pub const MAX_SPAN_BATCH_ELEMENTS: u64 = 10_000_000; mod batch; pub use batch::SpanBatch; diff --git a/crates/derive/src/batch/span_batch/payload.rs b/crates/derive/src/batch/span_batch/payload.rs index fc7bd3db..b97fa938 100644 --- a/crates/derive/src/batch/span_batch/payload.rs +++ b/crates/derive/src/batch/span_batch/payload.rs @@ -1,6 +1,6 @@ //! Raw Span Batch Payload -use super::{FJORD_MAX_SPAN_BATCH_BYTES, MAX_SPAN_BATCH_BYTES}; +use super::MAX_SPAN_BATCH_ELEMENTS; use crate::batch::{SpanBatchBits, SpanBatchError, SpanBatchTransactions, SpanDecodingError}; use alloc::vec::Vec; @@ -20,58 +20,40 @@ pub struct SpanBatchPayload { impl SpanBatchPayload { /// Decodes a [SpanBatchPayload] from a reader. - pub fn decode_payload(r: &mut &[u8], is_fjord_active: bool) -> Result { + pub fn decode_payload(r: &mut &[u8]) -> Result { let mut payload = Self::default(); - payload.decode_block_count(r, is_fjord_active)?; - payload.decode_origin_bits(r, is_fjord_active)?; - payload.decode_block_tx_counts(r, is_fjord_active)?; - payload.decode_txs(r, is_fjord_active)?; + payload.decode_block_count(r)?; + payload.decode_origin_bits(r)?; + payload.decode_block_tx_counts(r)?; + payload.decode_txs(r)?; Ok(payload) } /// Encodes a [SpanBatchPayload] into a writer. - pub fn encode_payload( - &self, - w: &mut Vec, - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { + pub fn encode_payload(&self, w: &mut Vec) -> Result<(), SpanBatchError> { self.encode_block_count(w); - self.encode_origin_bits(w, is_fjord_active)?; + self.encode_origin_bits(w)?; self.encode_block_tx_counts(w); - self.encode_txs(w, is_fjord_active) + self.encode_txs(w) } /// Decodes the origin bits from a reader. - pub fn decode_origin_bits( - &mut self, - r: &mut &[u8], - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { - self.origin_bits = SpanBatchBits::decode(r, self.block_count as usize, is_fjord_active)?; - Ok(()) - } - - /// Returns the max span batch size based on the Fjord hardfork. - pub const fn max_span_batch_size(&self, is_fjord_active: bool) -> usize { - if is_fjord_active { - FJORD_MAX_SPAN_BATCH_BYTES as usize - } else { - MAX_SPAN_BATCH_BYTES as usize + pub fn decode_origin_bits(&mut self, r: &mut &[u8]) -> Result<(), SpanBatchError> { + if self.block_count > MAX_SPAN_BATCH_ELEMENTS { + return Err(SpanBatchError::TooBigSpanBatchSize); } + + self.origin_bits = SpanBatchBits::decode(r, self.block_count as usize)?; + Ok(()) } /// Decode a block count from a reader. - pub fn decode_block_count( - &mut self, - r: &mut &[u8], - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { + pub fn decode_block_count(&mut self, r: &mut &[u8]) -> Result<(), SpanBatchError> { let (block_count, remaining) = unsigned_varint::decode::u64(r) .map_err(|_| SpanBatchError::Decoding(SpanDecodingError::BlockCount))?; // The number of transactions in a single L2 block cannot be greater than - // [MAX_SPAN_BATCH_BYTES] or [FJORD_MAX_SPAN_BATCH_BYTES] if Fjord is active. - let max_span_batch_size = self.max_span_batch_size(is_fjord_active); - if block_count as usize > max_span_batch_size { + // [MAX_SPAN_BATCH_ELEMENTS]. + if block_count > MAX_SPAN_BATCH_ELEMENTS { return Err(SpanBatchError::TooBigSpanBatchSize); } if block_count == 0 { @@ -83,11 +65,7 @@ impl SpanBatchPayload { } /// Decode block transaction counts from a reader. - pub fn decode_block_tx_counts( - &mut self, - r: &mut &[u8], - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { + pub fn decode_block_tx_counts(&mut self, r: &mut &[u8]) -> Result<(), SpanBatchError> { // Initially allocate the vec with the block count, to reduce re-allocations in the first // few blocks. let mut block_tx_counts = Vec::with_capacity(self.block_count as usize); @@ -97,10 +75,8 @@ impl SpanBatchPayload { .map_err(|_| SpanBatchError::Decoding(SpanDecodingError::BlockTxCounts))?; // The number of transactions in a single L2 block cannot be greater than - // [MAX_SPAN_BATCH_BYTES] or [FJORD_MAX_SPAN_BATCH_BYTES] if Fjord is active. - // Every transaction will take at least a single byte. - let max_span_batch_size = self.max_span_batch_size(is_fjord_active); - if block_tx_count as usize > max_span_batch_size { + // [MAX_SPAN_BATCH_ELEMENTS]. + if block_tx_count > MAX_SPAN_BATCH_ELEMENTS { return Err(SpanBatchError::TooBigSpanBatchSize); } block_tx_counts.push(block_tx_count); @@ -111,11 +87,7 @@ impl SpanBatchPayload { } /// Decode transactions from a reader. - pub fn decode_txs( - &mut self, - r: &mut &[u8], - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { + pub fn decode_txs(&mut self, r: &mut &[u8]) -> Result<(), SpanBatchError> { if self.block_tx_counts.is_empty() { return Err(SpanBatchError::EmptySpanBatch); } @@ -126,23 +98,18 @@ impl SpanBatchPayload { })?; // The total number of transactions in a span batch cannot be greater than - // [MAX_SPAN_BATCH_BYTES] or [FJORD_MAX_SPAN_BATCH_BYTES] if Fjord is active. - let max_span_batch_size = self.max_span_batch_size(is_fjord_active); - if total_block_tx_count as usize > max_span_batch_size { + // [MAX_SPAN_BATCH_ELEMENTS]. + if total_block_tx_count > MAX_SPAN_BATCH_ELEMENTS { return Err(SpanBatchError::TooBigSpanBatchSize); } self.txs.total_block_tx_count = total_block_tx_count; - self.txs.decode(r, is_fjord_active)?; + self.txs.decode(r)?; Ok(()) } /// Encode the origin bits into a writer. - pub fn encode_origin_bits( - &self, - w: &mut Vec, - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { - SpanBatchBits::encode(w, self.block_count as usize, &self.origin_bits, is_fjord_active) + pub fn encode_origin_bits(&self, w: &mut Vec) -> Result<(), SpanBatchError> { + SpanBatchBits::encode(w, self.block_count as usize, &self.origin_bits) } /// Encode the block count into a writer. @@ -161,8 +128,8 @@ impl SpanBatchPayload { } /// Encode the transactions into a writer. - pub fn encode_txs(&self, w: &mut Vec, is_fjord_active: bool) -> Result<(), SpanBatchError> { - self.txs.encode(w, is_fjord_active) + pub fn encode_txs(&self, w: &mut Vec) -> Result<(), SpanBatchError> { + self.txs.encode(w) } } @@ -177,7 +144,7 @@ mod tests { let encoded = vec![2; block_count / 8 + 1]; let mut payload = SpanBatchPayload { block_count: block_count as u64, ..Default::default() }; - payload.decode_origin_bits(&mut encoded.as_slice(), false).unwrap(); + payload.decode_origin_bits(&mut encoded.as_slice()).unwrap(); assert_eq!(payload.origin_bits, SpanBatchBits(vec![2; block_count / 8 + 1])); } @@ -186,47 +153,27 @@ mod tests { let mut u64_varint_buf = [0; 10]; let mut encoded = unsigned_varint::encode::u64(0, &mut u64_varint_buf); let mut payload = SpanBatchPayload::default(); - let err = payload.decode_block_count(&mut encoded, false).unwrap_err(); + let err = payload.decode_block_count(&mut encoded).unwrap_err(); assert_eq!(err, SpanBatchError::EmptySpanBatch); } #[test] - fn test_decode_block_count_pre_fjord() { - let block_count = MAX_SPAN_BATCH_BYTES; + fn test_decode_block_count() { + let block_count = MAX_SPAN_BATCH_ELEMENTS; let mut u64_varint_buf = [0; 10]; let mut encoded = unsigned_varint::encode::u64(block_count, &mut u64_varint_buf); let mut payload = SpanBatchPayload::default(); - payload.decode_block_count(&mut encoded, false).unwrap(); + payload.decode_block_count(&mut encoded).unwrap(); assert_eq!(payload.block_count, block_count); } #[test] - fn test_decode_block_count_pre_fjord_errors() { - let block_count = MAX_SPAN_BATCH_BYTES + 1; + fn test_decode_block_count_errors() { + let block_count = MAX_SPAN_BATCH_ELEMENTS + 1; let mut u64_varint_buf = [0; 10]; let mut encoded = unsigned_varint::encode::u64(block_count, &mut u64_varint_buf); let mut payload = SpanBatchPayload::default(); - let err = payload.decode_block_count(&mut encoded, false).unwrap_err(); - assert_eq!(err, SpanBatchError::TooBigSpanBatchSize); - } - - #[test] - fn test_decode_block_count_post_fjord() { - let block_count = FJORD_MAX_SPAN_BATCH_BYTES; - let mut u64_varint_buf = [0; 10]; - let mut encoded = unsigned_varint::encode::u64(block_count, &mut u64_varint_buf); - let mut payload = SpanBatchPayload::default(); - payload.decode_block_count(&mut encoded, true).unwrap(); - assert_eq!(payload.block_count, block_count); - } - - #[test] - fn test_decode_block_count_post_fjord_errors() { - let block_count = FJORD_MAX_SPAN_BATCH_BYTES + 1; - let mut u64_varint_buf = [0; 10]; - let mut encoded = unsigned_varint::encode::u64(block_count, &mut u64_varint_buf); - let mut payload = SpanBatchPayload::default(); - let err = payload.decode_block_count(&mut encoded, true).unwrap_err(); + let err = payload.decode_block_count(&mut encoded).unwrap_err(); assert_eq!(err, SpanBatchError::TooBigSpanBatchSize); } @@ -236,21 +183,14 @@ mod tests { let mut u64_varint_buf = [0; 10]; let mut encoded = unsigned_varint::encode::u64(block_count, &mut u64_varint_buf); let mut payload = SpanBatchPayload::default(); - payload.decode_block_count(&mut encoded, false).unwrap(); + payload.decode_block_count(&mut encoded).unwrap(); let mut r: Vec = Vec::new(); for _ in 0..2 { let mut buf = [0u8; 10]; let encoded = unsigned_varint::encode::u64(2, &mut buf); r.append(&mut encoded.to_vec()); } - payload.decode_block_tx_counts(&mut r.as_slice(), false).unwrap(); + payload.decode_block_tx_counts(&mut r.as_slice()).unwrap(); assert_eq!(payload.block_tx_counts, vec![2, 2]); } - - #[test] - fn test_max_span_batch_size() { - let payload = SpanBatchPayload::default(); - assert_eq!(payload.max_span_batch_size(false), MAX_SPAN_BATCH_BYTES as usize); - assert_eq!(payload.max_span_batch_size(true), FJORD_MAX_SPAN_BATCH_BYTES as usize); - } } diff --git a/crates/derive/src/batch/span_batch/prefix.rs b/crates/derive/src/batch/span_batch/prefix.rs index f546db41..51376f5c 100644 --- a/crates/derive/src/batch/span_batch/prefix.rs +++ b/crates/derive/src/batch/span_batch/prefix.rs @@ -1,10 +1,8 @@ //! Raw Span Batch Prefix +use crate::batch::{SpanBatchError, SpanDecodingError}; use alloc::vec::Vec; use alloy_primitives::FixedBytes; -use op_alloy_genesis::RollupConfig; - -use crate::batch::{SpanBatchError, SpanDecodingError}; /// Span Batch Prefix #[derive(Debug, Clone, Default, PartialEq, Eq)] @@ -30,11 +28,6 @@ impl SpanBatchPrefix { Ok(prefix) } - /// Returns if the Fjord hardfork is active based on the relative timestamp of the span batch. - pub fn is_fjord_active(&self, cfg: &RollupConfig) -> bool { - cfg.is_fjord_active(self.rel_timestamp + cfg.genesis.l2_time) - } - /// Decodes the relative timestamp from a reader. pub fn decode_rel_timestamp(&mut self, r: &mut &[u8]) -> Result<(), SpanBatchError> { let (rel_timestamp, remaining) = unsigned_varint::decode::u64(r) @@ -87,17 +80,6 @@ mod test { use alloc::vec::Vec; use alloy_primitives::address; - #[test] - fn test_is_fjord_active() { - let mut cfg = RollupConfig::default(); - let prefix = SpanBatchPrefix { rel_timestamp: 10, ..Default::default() }; - assert!(!prefix.is_fjord_active(&cfg)); - cfg.fjord_time = Some(11); - assert!(!prefix.is_fjord_active(&cfg)); - cfg.fjord_time = Some(9); - assert!(prefix.is_fjord_active(&cfg)); - } - #[test] fn test_span_batch_prefix_encoding_roundtrip() { let expected = SpanBatchPrefix { diff --git a/crates/derive/src/batch/span_batch/raw.rs b/crates/derive/src/batch/span_batch/raw.rs index 365f36fa..9e5ef614 100644 --- a/crates/derive/src/batch/span_batch/raw.rs +++ b/crates/derive/src/batch/span_batch/raw.rs @@ -1,10 +1,8 @@ //! Raw Span Batch -use alloc::{vec, vec::Vec}; -use op_alloy_genesis::RollupConfig; - use super::{SpanBatch, SpanBatchElement, SpanBatchError, SpanBatchPayload, SpanBatchPrefix}; use crate::batch::{BatchType, SpanDecodingError}; +use alloc::{vec, vec::Vec}; /// Raw Span Batch #[derive(Debug, Clone, PartialEq, Eq)] @@ -50,28 +48,16 @@ impl RawSpanBatch { BatchType::Span } - /// Returns the timestamp for the span batch. - pub const fn timestamp(&self) -> u64 { - self.prefix.rel_timestamp - } - - fn is_fjord_active(prefix: &SpanBatchPrefix, cfg: &RollupConfig) -> bool { - let timestamp = cfg.genesis.l2_time + prefix.rel_timestamp; - cfg.is_fjord_active(timestamp) - } - /// Encodes the [RawSpanBatch] into a writer. - pub fn encode(&self, w: &mut Vec, cfg: &RollupConfig) -> Result<(), SpanBatchError> { + pub fn encode(&self, w: &mut Vec) -> Result<(), SpanBatchError> { self.prefix.encode_prefix(w); - let is_fjord_active = Self::is_fjord_active(&self.prefix, cfg); - self.payload.encode_payload(w, is_fjord_active) + self.payload.encode_payload(w) } /// Decodes the [RawSpanBatch] from a reader.] - pub fn decode(r: &mut &[u8], cfg: &RollupConfig) -> Result { + pub fn decode(r: &mut &[u8]) -> Result { let prefix = SpanBatchPrefix::decode_prefix(r)?; - let is_fjord_active = Self::is_fjord_active(&prefix, cfg); - let payload = SpanBatchPayload::decode_payload(r, is_fjord_active)?; + let payload = SpanBatchPayload::decode_payload(r)?; Ok(Self { prefix, payload }) } @@ -137,7 +123,7 @@ impl RawSpanBatch { #[cfg(test)] mod test { - use super::{RawSpanBatch, RollupConfig, SpanBatch, SpanBatchElement}; + use super::{RawSpanBatch, SpanBatch, SpanBatchElement}; use alloc::{vec, vec::Vec}; use alloy_primitives::FixedBytes; @@ -177,13 +163,11 @@ mod test { fn test_decode_encode_raw_span_batch() { // Load in the raw span batch from the `op-node` derivation pipeline implementation. let raw_span_batch_hex = include_bytes!("../../../testdata/raw_batch.hex"); - let cfg = RollupConfig::default(); - let mut raw_span_batch = - RawSpanBatch::decode(&mut raw_span_batch_hex.as_slice(), &cfg).unwrap(); + let mut raw_span_batch = RawSpanBatch::decode(&mut raw_span_batch_hex.as_slice()).unwrap(); raw_span_batch.payload.txs.recover_v(981).unwrap(); let mut encoding_buf = Vec::new(); - raw_span_batch.encode(&mut encoding_buf, &cfg).unwrap(); + raw_span_batch.encode(&mut encoding_buf).unwrap(); assert_eq!(encoding_buf, raw_span_batch_hex); } } diff --git a/crates/derive/src/batch/span_batch/transactions.rs b/crates/derive/src/batch/span_batch/transactions.rs index c092f52e..90514067 100644 --- a/crates/derive/src/batch/span_batch/transactions.rs +++ b/crates/derive/src/batch/span_batch/transactions.rs @@ -3,7 +3,7 @@ use super::{ convert_v_to_y_parity, read_tx_data, utils::is_protected_v, SpanBatchBits, SpanBatchError, - SpanBatchSignature, SpanBatchTransactionData, SpanDecodingError, + SpanBatchSignature, SpanBatchTransactionData, SpanDecodingError, MAX_SPAN_BATCH_ELEMENTS, }; use alloc::vec::Vec; use alloy_consensus::{Transaction, TxEnvelope, TxType}; @@ -40,73 +40,46 @@ pub struct SpanBatchTransactions { impl SpanBatchTransactions { /// Encodes the [SpanBatchTransactions] into a writer. - pub fn encode(&self, w: &mut Vec, is_fjord_active: bool) -> Result<(), SpanBatchError> { - self.encode_contract_creation_bits(w, is_fjord_active)?; - self.encode_y_parity_bits(w, is_fjord_active)?; + pub fn encode(&self, w: &mut Vec) -> Result<(), SpanBatchError> { + self.encode_contract_creation_bits(w)?; + self.encode_y_parity_bits(w)?; self.encode_tx_sigs_rs(w)?; self.encode_tx_tos(w)?; self.encode_tx_datas(w)?; self.encode_tx_nonces(w)?; self.encode_tx_gases(w)?; - self.encode_protected_bits(w, is_fjord_active)?; + self.encode_protected_bits(w)?; Ok(()) } /// Decodes the [SpanBatchTransactions] from a reader. - pub fn decode(&mut self, r: &mut &[u8], is_fjord_active: bool) -> Result<(), SpanBatchError> { - self.decode_contract_creation_bits(r, is_fjord_active)?; - self.decode_y_parity_bits(r, is_fjord_active)?; + pub fn decode(&mut self, r: &mut &[u8]) -> Result<(), SpanBatchError> { + self.decode_contract_creation_bits(r)?; + self.decode_y_parity_bits(r)?; self.decode_tx_sigs_rs(r)?; self.decode_tx_tos(r)?; self.decode_tx_datas(r)?; self.decode_tx_nonces(r)?; self.decode_tx_gases(r)?; - self.decode_protected_bits(r, is_fjord_active)?; + self.decode_protected_bits(r)?; Ok(()) } /// Encode the contract creation bits into a writer. - pub fn encode_contract_creation_bits( - &self, - w: &mut Vec, - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { - SpanBatchBits::encode( - w, - self.total_block_tx_count as usize, - &self.contract_creation_bits, - is_fjord_active, - )?; + pub fn encode_contract_creation_bits(&self, w: &mut Vec) -> Result<(), SpanBatchError> { + SpanBatchBits::encode(w, self.total_block_tx_count as usize, &self.contract_creation_bits)?; Ok(()) } /// Encode the protected bits into a writer. - pub fn encode_protected_bits( - &self, - w: &mut Vec, - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { - SpanBatchBits::encode( - w, - self.legacy_tx_count as usize, - &self.protected_bits, - is_fjord_active, - )?; + pub fn encode_protected_bits(&self, w: &mut Vec) -> Result<(), SpanBatchError> { + SpanBatchBits::encode(w, self.legacy_tx_count as usize, &self.protected_bits)?; Ok(()) } /// Encode the y parity bits into a writer. - pub fn encode_y_parity_bits( - &self, - w: &mut Vec, - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { - SpanBatchBits::encode( - w, - self.total_block_tx_count as usize, - &self.y_parity_bits, - is_fjord_active, - )?; + pub fn encode_y_parity_bits(&self, w: &mut Vec) -> Result<(), SpanBatchError> { + SpanBatchBits::encode(w, self.total_block_tx_count as usize, &self.y_parity_bits)?; Ok(()) } @@ -156,35 +129,28 @@ impl SpanBatchTransactions { } /// Decode the contract creation bits from a reader. - pub fn decode_contract_creation_bits( - &mut self, - r: &mut &[u8], - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { - self.contract_creation_bits = - SpanBatchBits::decode(r, self.total_block_tx_count as usize, is_fjord_active)?; + pub fn decode_contract_creation_bits(&mut self, r: &mut &[u8]) -> Result<(), SpanBatchError> { + if self.total_block_tx_count > MAX_SPAN_BATCH_ELEMENTS { + return Err(SpanBatchError::TooBigSpanBatchSize); + } + + self.contract_creation_bits = SpanBatchBits::decode(r, self.total_block_tx_count as usize)?; Ok(()) } /// Decode the protected bits from a reader. - pub fn decode_protected_bits( - &mut self, - r: &mut &[u8], - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { - self.protected_bits = - SpanBatchBits::decode(r, self.legacy_tx_count as usize, is_fjord_active)?; + pub fn decode_protected_bits(&mut self, r: &mut &[u8]) -> Result<(), SpanBatchError> { + if self.legacy_tx_count > MAX_SPAN_BATCH_ELEMENTS { + return Err(SpanBatchError::TooBigSpanBatchSize); + } + + self.protected_bits = SpanBatchBits::decode(r, self.legacy_tx_count as usize)?; Ok(()) } /// Decode the y parity bits from a reader. - pub fn decode_y_parity_bits( - &mut self, - r: &mut &[u8], - is_fjord_active: bool, - ) -> Result<(), SpanBatchError> { - self.y_parity_bits = - SpanBatchBits::decode(r, self.total_block_tx_count as usize, is_fjord_active)?; + pub fn decode_y_parity_bits(&mut self, r: &mut &[u8]) -> Result<(), SpanBatchError> { + self.y_parity_bits = SpanBatchBits::decode(r, self.total_block_tx_count as usize)?; Ok(()) } diff --git a/crates/derive/src/errors.rs b/crates/derive/src/errors.rs index 33d1f8e5..654ece49 100644 --- a/crates/derive/src/errors.rs +++ b/crates/derive/src/errors.rs @@ -1,6 +1,6 @@ //! This module contains derivation errors thrown within the pipeline. -use crate::batch::SpanBatchError; +use crate::batch::{SpanBatchError, MAX_SPAN_BATCH_ELEMENTS}; use alloc::string::String; use alloy_eips::BlockNumHash; use alloy_primitives::B256; @@ -162,10 +162,8 @@ pub enum PipelineEncodingError { /// A frame decompression error. #[derive(Error, Debug, PartialEq, Eq)] pub enum BatchDecompressionError { - /// The buffer exceeds the [FJORD_MAX_SPAN_BATCH_BYTES] protocol parameter. - /// - /// [FJORD_MAX_SPAN_BATCH_BYTES]: crate::batch::FJORD_MAX_SPAN_BATCH_BYTES - #[error("The batch exceeds the maximum size of {max_size} bytes", max_size = crate::batch::FJORD_MAX_SPAN_BATCH_BYTES)] + /// The buffer exceeds the [MAX_SPAN_BATCH_ELEMENTS] protocol parameter. + #[error("The batch exceeds the maximum number of elements: {max_size}", max_size = MAX_SPAN_BATCH_ELEMENTS)] BatchTooLarge, } diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 12e7a77f..63b0b0fa 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -512,7 +512,7 @@ mod tests { use alloy_rlp::{BytesMut, Encodable}; use kona_providers::test_utils::TestL2ChainProvider; use op_alloy_consensus::{OpBlock, OpTxEnvelope, OpTxType, TxDeposit}; - use op_alloy_genesis::ChainGenesis; + use op_alloy_genesis::{ChainGenesis, MAX_RLP_BYTES_PER_CHANNEL_FJORD}; use op_alloy_protocol::{L1BlockInfoBedrock, L1BlockInfoTx}; use tracing::Level; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -523,7 +523,7 @@ mod tests { let file_contents = &(&*file_contents)[..file_contents.len() - 1]; let data = alloy_primitives::hex::decode(file_contents).unwrap(); let bytes: alloy_primitives::Bytes = data.into(); - BatchReader::from(bytes) + BatchReader::new(bytes, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize) } #[test] diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 94db1198..2890d610 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -12,7 +12,9 @@ use alloy_rlp::Decodable; use async_trait::async_trait; use core::fmt::Debug; use miniz_oxide::inflate::decompress_to_vec_zlib; -use op_alloy_genesis::{RollupConfig, SystemConfig}; +use op_alloy_genesis::{ + RollupConfig, SystemConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD, +}; use op_alloy_protocol::BlockInfo; use tracing::{debug, error, warn}; @@ -72,7 +74,16 @@ where if self.next_batch.is_none() { let channel = self.prev.next_data().await?.ok_or(PipelineError::ChannelReaderEmpty.temp())?; - self.next_batch = Some(BatchReader::from(&channel[..])); + + let origin = self.prev.origin().ok_or(PipelineError::MissingOrigin.crit())?; + let max_rlp_bytes_per_channel = if self.cfg.is_fjord_active(origin.timestamp) { + MAX_RLP_BYTES_PER_CHANNEL_FJORD + } else { + MAX_RLP_BYTES_PER_CHANNEL_BEDROCK + }; + + self.next_batch = + Some(BatchReader::new(&channel[..], max_rlp_bytes_per_channel as usize)); } Ok(()) } @@ -182,9 +193,24 @@ pub(crate) struct BatchReader { decompressed: Vec, /// The current cursor in the `decompressed` data. cursor: usize, + /// The maximum RLP bytes per channel. + max_rlp_bytes_per_channel: usize, } impl BatchReader { + /// Creates a new [BatchReader] from the given data and max decompressed RLP bytes per channel. + pub(crate) fn new(data: T, max_rlp_bytes_per_channel: usize) -> Self + where + T: Into>, + { + Self { + data: Some(data.into()), + decompressed: Vec::new(), + cursor: 0, + max_rlp_bytes_per_channel, + } + } + /// Pulls out the next batch from the reader. pub(crate) fn next_batch(&mut self, cfg: &RollupConfig) -> Option { // If the data is not already decompressed, decompress it. @@ -210,9 +236,15 @@ impl BatchReader { (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD { self.decompressed = decompress_to_vec_zlib(&data).ok()?; + + // Check the size of the decompressed channel RLP. + if self.decompressed.len() > self.max_rlp_bytes_per_channel { + return None; + } } else if compression_type == CHANNEL_VERSION_BROTLI { brotli_used = true; - self.decompressed = decompress_brotli(&data[1..]).ok()?; + self.decompressed = + decompress_brotli(&data[1..], self.max_rlp_bytes_per_channel).ok()?; } else { error!(target: "batch-reader", "Unsupported compression type: {:x}, skipping batch", compression_type); crate::inc!(BATCH_READER_ERRORS, &["unsupported_compression_type"]); @@ -239,17 +271,10 @@ impl BatchReader { // Advance the cursor on the reader. self.cursor = self.decompressed.len() - decompressed_reader.len(); - Some(batch) } } -impl>> From for BatchReader { - fn from(data: T) -> Self { - Self { data: Some(data.into()), decompressed: Vec::new(), cursor: 0 } - } -} - #[cfg(test)] mod test { use super::*; @@ -268,7 +293,10 @@ mod test { async fn test_flush_channel_reader() { let mock = TestChannelReaderProvider::new(vec![Ok(Some(new_compressed_batch_data()))]); let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); - reader.next_batch = Some(BatchReader::from(new_compressed_batch_data())); + reader.next_batch = Some(BatchReader::new( + new_compressed_batch_data(), + MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize, + )); reader.flush_channel().await.unwrap(); assert!(reader.next_batch.is_none()); } @@ -277,7 +305,10 @@ mod test { async fn test_reset_channel_reader() { let mock = TestChannelReaderProvider::new(vec![Ok(None)]); let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); - reader.next_batch = Some(BatchReader::from(vec![0x00, 0x01, 0x02])); + reader.next_batch = Some(BatchReader::new( + vec![0x00, 0x01, 0x02], + MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize, + )); assert!(!reader.prev.reset); reader.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap(); assert!(reader.next_batch.is_none()); @@ -327,11 +358,20 @@ mod test { fn test_batch_reader() { let raw = new_compressed_batch_data(); let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len(); - let mut reader = BatchReader::from(raw); + let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK as usize); reader.next_batch(&RollupConfig::default()).unwrap(); assert_eq!(reader.cursor, decompressed_len); } + #[test] + fn test_batch_reader_fjord() { + let raw = new_compressed_batch_data(); + let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len(); + let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize); + reader.next_batch(&RollupConfig { fjord_time: Some(0), ..Default::default() }).unwrap(); + assert_eq!(reader.cursor, decompressed_len); + } + #[tokio::test] async fn test_flush_post_holocene() { let raw = new_compressed_batch_data(); diff --git a/crates/derive/src/stages/utils.rs b/crates/derive/src/stages/utils.rs index 7d23a271..6e7c297b 100644 --- a/crates/derive/src/stages/utils.rs +++ b/crates/derive/src/stages/utils.rs @@ -1,6 +1,6 @@ //! Stage Utilities -use crate::{batch::FJORD_MAX_SPAN_BATCH_BYTES, ensure, errors::BatchDecompressionError}; +use crate::{ensure, errors::BatchDecompressionError}; use alloc::{vec, vec::Vec}; use alloc_no_stdlib::*; use brotli::*; @@ -8,7 +8,10 @@ use core::ops; /// Decompresses the given bytes data using the Brotli decompressor implemented /// in the [`brotli`](https://crates.io/crates/brotli) crate. -pub fn decompress_brotli(data: &[u8]) -> Result, BatchDecompressionError> { +pub fn decompress_brotli( + data: &[u8], + max_rlp_bytes_per_channel: usize, +) -> Result, BatchDecompressionError> { declare_stack_allocator_struct!(MemPool, 4096, stack); let mut u8_buffer = vec![0; 32 * 1024 * 1024].into_boxed_slice(); @@ -47,7 +50,7 @@ pub fn decompress_brotli(data: &[u8]) -> Result, BatchDecompressionError let new_len = old_len * 2; ensure!( - new_len as u64 <= FJORD_MAX_SPAN_BATCH_BYTES, + new_len <= max_rlp_bytes_per_channel, BatchDecompressionError::BatchTooLarge ); @@ -68,13 +71,15 @@ pub fn decompress_brotli(data: &[u8]) -> Result, BatchDecompressionError mod test { use super::*; use alloy_primitives::hex; + use op_alloy_genesis::MAX_RLP_BYTES_PER_CHANNEL_FJORD; #[test] fn test_decompress_brotli() { let expected = hex!("75ed184249e9bc19675e"); let compressed = hex!("8b048075ed184249e9bc19675e03"); - let decompressed = decompress_brotli(&compressed).unwrap(); + let decompressed = + decompress_brotli(&compressed, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize).unwrap(); assert_eq!(decompressed, expected); } @@ -83,7 +88,8 @@ mod test { let raw_batch_decompressed = hex!(""); let raw_batch = hex!(""); - let decompressed = decompress_brotli(&raw_batch).unwrap(); + let decompressed = + decompress_brotli(&raw_batch, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize).unwrap(); assert_eq!(decompressed, raw_batch_decompressed); } }