From 7952058b9e80e6c2dab6a4d6c09d9797e4baae56 Mon Sep 17 00:00:00 2001 From: refcell Date: Fri, 27 Sep 2024 16:00:18 -0400 Subject: [PATCH] feat(derive): Holocene Frame Queue (#579) * feat(derive): holocene frame queue changes * feat(derive): holocene frame queue * Update crates/derive/src/stages/frame_queue.rs Co-authored-by: clabby * fix(derive): fq nits --------- Co-authored-by: clabby --- crates/derive/src/pipeline/builder.rs | 2 +- crates/derive/src/stages/frame_queue.rs | 336 ++++++++++++++++-- .../src/stages/test_utils/frame_queue.rs | 11 +- 3 files changed, 309 insertions(+), 40 deletions(-) diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index 89e85fa5..f51556fe 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -131,7 +131,7 @@ where let mut l1_traversal = L1Traversal::new(chain_provider, Arc::clone(&rollup_config)); l1_traversal.block = Some(builder.origin.expect("origin must be set")); let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); - let frame_queue = FrameQueue::new(l1_retrieval); + let frame_queue = FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config)); let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue); let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config)); let batch_stream = BatchStream::new(channel_reader, rollup_config.clone()); diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index d7886e37..bf0d3ba8 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -5,11 +5,11 @@ use crate::{ stages::ChannelBankProvider, traits::{OriginAdvancer, OriginProvider, ResettableStage}, }; -use alloc::{boxed::Box, collections::VecDeque}; +use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use alloy_primitives::Bytes; use async_trait::async_trait; use core::fmt::Debug; -use op_alloy_genesis::SystemConfig; +use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::{BlockInfo, Frame}; use tracing::{debug, error, trace}; @@ -38,6 +38,8 @@ where pub prev: P, /// The current frame queue. queue: VecDeque, + /// The rollup config. + rollup_config: Arc, } impl

FrameQueue

@@ -47,9 +49,98 @@ where /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. /// /// [L1Retrieval]: crate::stages::L1Retrieval - pub fn new(prev: P) -> Self { + pub fn new(prev: P, cfg: Arc) -> Self { crate::set!(STAGE_RESETS, 0, &["frame-queue"]); - Self { prev, queue: VecDeque::new() } + Self { prev, queue: VecDeque::new(), rollup_config: cfg } + } + + /// Returns if holocene is active. + pub fn is_holocene_active(&self, origin: BlockInfo) -> bool { + self.rollup_config.is_holocene_active(origin.timestamp) + } + + /// Prunes frames if Holocene is active. + pub fn prune(&mut self, origin: BlockInfo) { + if !self.is_holocene_active(origin) { + return; + } + + let mut i = 0; + while i < self.queue.len() - 1 { + let frame = &self.queue[i]; + let next_frame = &self.queue[i + 1]; + + // If the frames are in the same channel, and the current is last, drop the next frame. + if frame.id == next_frame.id && frame.is_last { + self.queue.remove(i + 1); + continue; + } + + // If the frames are in the same channel, and the frame numbers are not sequential, + // drop the next frame. + if frame.id == next_frame.id && frame.number + 1 != next_frame.number { + self.queue.remove(i + 1); + continue; + } + + // If the frames are in different channels, and the current channel is not last, walk + // back the channel and drop all prev frames. + if frame.id != next_frame.id && !frame.is_last && next_frame.number == 0 { + self.queue.remove(i); + i = i.saturating_sub(1); + continue; + } + + // If the frames are in different channels, the next frame must be first. + if frame.id != next_frame.id && next_frame.number != 0 { + self.queue.remove(i + 1); + continue; + } + + i += 1; + } + } + + /// Loads more frames into the [FrameQueue]. + pub async fn load_frames(&mut self) -> PipelineResult<()> { + // Skip loading frames if the queue is not empty. + if !self.queue.is_empty() { + return Ok(()); + } + + let data = match self.prev.next_data().await { + Ok(data) => data, + Err(e) => { + debug!(target: "frame-queue", "Failed to retrieve data: {:?}", e); + // SAFETY: Bubble up potential EOF error without wrapping. + return Err(e); + } + }; + + let Ok(frames) = Frame::parse_frames(&data.into()) else { + crate::inc!(DERIVED_FRAMES_COUNT, &["failed"]); + // There may be more frames in the queue for the + // pipeline to advance, so don't return an error here. + error!(target: "frame-queue", "Failed to parse frames from data."); + return Ok(()); + }; + + // Frame parsing should return an error if no frames were parsed + // and the above branch should be hit. Either way, return early here + // so as to not re-apply the pruning rules against all frames. + if frames.is_empty() { + error!(target: "frame-queue", "Frames should not be empty after parsing."); + return Ok(()); + } + + crate::inc!(DERIVED_FRAMES_COUNT, frames.len() as f64, &["success"]); + self.queue.extend(frames); + + // Prune frames if Holocene is active. + let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; + self.prune(origin); + + Ok(()) } } @@ -69,25 +160,7 @@ where P: FrameQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, { async fn next_frame(&mut self) -> PipelineResult { - if self.queue.is_empty() { - match self.prev.next_data().await { - Ok(data) => { - if let Ok(frames) = Frame::parse_frames(&data.into()) { - crate::inc!(DERIVED_FRAMES_COUNT, frames.len() as f64, &["success"]); - self.queue.extend(frames); - } else { - crate::inc!(DERIVED_FRAMES_COUNT, &["failed"]); - // There may be more frames in the queue for the - // pipeline to advance, so don't return an error here. - error!(target: "frame-queue", "Failed to parse frames from data."); - } - } - Err(e) => { - debug!(target: "frame-queue", "Failed to retrieve data: {:?}", e); - return Err(e); // Bubble up potential EOF error without wrapping. - } - } - } + self.load_frames().await?; // If we did not add more frames but still have more data, retry this function. if self.queue.is_empty() { @@ -143,8 +216,7 @@ pub(crate) mod tests { .collect() } - pub(crate) fn new_encoded_test_frames(count: usize) -> Bytes { - let frames = new_test_frames(count); + pub(crate) fn encode_frames(frames: Vec) -> Bytes { let mut bytes = Vec::new(); bytes.extend_from_slice(&[DERIVATION_VERSION_0]); for frame in frames.iter() { @@ -153,11 +225,18 @@ pub(crate) mod tests { Bytes::from(bytes) } + pub(crate) fn new_encoded_test_frames(count: usize) -> Bytes { + let frames = new_test_frames(count); + encode_frames(frames) + } + #[tokio::test] async fn test_frame_queue_empty_bytes() { let data = vec![Ok(Bytes::from(vec![0x00]))]; - let mock = MockFrameQueueProvider { data }; - let mut frame_queue = FrameQueue::new(mock); + let mut mock = MockFrameQueueProvider::new(data); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default()); + assert!(!frame_queue.is_holocene_active(BlockInfo::default())); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, PipelineError::NotEnoughData.temp()); } @@ -165,8 +244,10 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_no_frames_decoded() { let data = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())]; - let mock = MockFrameQueueProvider { data }; - let mut frame_queue = FrameQueue::new(mock); + let mut mock = MockFrameQueueProvider::new(data); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default()); + assert!(!frame_queue.is_holocene_active(BlockInfo::default())); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, PipelineError::NotEnoughData.temp()); } @@ -174,8 +255,10 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_wrong_derivation_version() { let data = vec![Ok(Bytes::from(vec![0x01]))]; - let mock = MockFrameQueueProvider { data }; - let mut frame_queue = FrameQueue::new(mock); + let mut mock = MockFrameQueueProvider::new(data); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default()); + assert!(!frame_queue.is_holocene_active(BlockInfo::default())); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, PipelineError::NotEnoughData.temp()); } @@ -183,8 +266,10 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_frame_too_short() { let data = vec![Ok(Bytes::from(vec![0x00, 0x01]))]; - let mock = MockFrameQueueProvider { data }; - let mut frame_queue = FrameQueue::new(mock); + let mut mock = MockFrameQueueProvider::new(data); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default()); + assert!(!frame_queue.is_holocene_active(BlockInfo::default())); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, PipelineError::NotEnoughData.temp()); } @@ -192,8 +277,10 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_single_frame() { let data = new_encoded_test_frames(1); - let mock = MockFrameQueueProvider { data: vec![Ok(data)] }; - let mut frame_queue = FrameQueue::new(mock); + let mut mock = MockFrameQueueProvider::new(vec![Ok(data)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default()); + assert!(!frame_queue.is_holocene_active(BlockInfo::default())); let frame_decoded = frame_queue.next_frame().await.unwrap(); let frame = new_test_frames(1); assert_eq!(frame[0], frame_decoded); @@ -204,8 +291,35 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_multiple_frames() { let data = new_encoded_test_frames(3); - let mock = MockFrameQueueProvider { data: vec![Ok(data)] }; - let mut frame_queue = FrameQueue::new(mock); + let mut mock = MockFrameQueueProvider::new(vec![Ok(data)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Default::default()); + assert!(!frame_queue.is_holocene_active(BlockInfo::default())); + for i in 0..3 { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded.number, i); + } + let err = frame_queue.next_frame().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + } + + #[tokio::test] + async fn test_frame_queue_missing_origin() { + let data = new_encoded_test_frames(1); + let mock = MockFrameQueueProvider::new(vec![Ok(data)]); + let mut frame_queue = FrameQueue::new(mock, Default::default()); + let err = frame_queue.next_frame().await.unwrap_err(); + assert_eq!(err, PipelineError::MissingOrigin.crit()); + } + + #[tokio::test] + async fn test_holocene_valid_frames() { + let channel = new_encoded_test_frames(3); + let config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut mock = MockFrameQueueProvider::new(vec![Ok(channel)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Arc::new(config)); + assert!(frame_queue.is_holocene_active(BlockInfo::default())); for i in 0..3 { let frame_decoded = frame_queue.next_frame().await.unwrap(); assert_eq!(frame_decoded.number, i); @@ -213,4 +327,152 @@ pub(crate) mod tests { let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, PipelineError::Eof.temp()); } + + #[tokio::test] + async fn test_holocene_unordered_frames() { + let frames = vec![ + // -- First Channel -- + Frame { id: [0xEE; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 2, data: vec![0xDD; 50], is_last: true }, + // Frame with the same channel id, but after is_last should be dropped. + Frame { id: [0xEE; 16], number: 3, data: vec![0xDD; 50], is_last: false }, + // -- Next Channel -- + Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xFF; 16], number: 1, data: vec![0xDD; 50], is_last: true }, + ]; + let encoded = encode_frames(frames.clone()); + let config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut mock = MockFrameQueueProvider::new(vec![Ok(encoded)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Arc::new(config)); + assert!(frame_queue.is_holocene_active(BlockInfo::default())); + for frame in frames.iter().take(3) { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded, *frame); + } + for i in 0..2 { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded, frames[i + 4]); + } + let err = frame_queue.next_frame().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + } + + #[tokio::test] + async fn test_holocene_non_sequential_frames() { + let frames = vec![ + // -- First Channel -- + Frame { id: [0xEE; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + // Both this and the next frames should be dropped since neither will be + // interpreted as having the next sequential frame number after 1. + Frame { id: [0xEE; 16], number: 3, data: vec![0xDD; 50], is_last: true }, + Frame { id: [0xEE; 16], number: 4, data: vec![0xDD; 50], is_last: false }, + ]; + let encoded = encode_frames(frames.clone()); + let config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut mock = MockFrameQueueProvider::new(vec![Ok(encoded)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Arc::new(config)); + assert!(frame_queue.is_holocene_active(BlockInfo::default())); + for frame in frames.iter().take(2) { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded, *frame); + } + let err = frame_queue.next_frame().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + } + + #[tokio::test] + async fn test_holocene_unclosed_channel() { + let frames = vec![ + // -- First Channel -- + // Since this channel isn't closed by a last frame it is entirely dropped + Frame { id: [0xEE; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 2, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 3, data: vec![0xDD; 50], is_last: false }, + // -- Next Channel -- + Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xFF; 16], number: 1, data: vec![0xDD; 50], is_last: true }, + ]; + let encoded = encode_frames(frames.clone()); + let config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut mock = MockFrameQueueProvider::new(vec![Ok(encoded)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Arc::new(config)); + assert!(frame_queue.is_holocene_active(BlockInfo::default())); + for i in 0..2 { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded, frames[i + 4]); + } + let err = frame_queue.next_frame().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + } + + #[tokio::test] + async fn test_holocene_unstarted_channel() { + let frames = vec![ + // -- First Channel -- + Frame { id: [0xDD; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xDD; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xDD; 16], number: 2, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xDD; 16], number: 3, data: vec![0xDD; 50], is_last: true }, + // -- Second Channel -- + // Since this channel doesn't have a starting frame where number == 0, + // it is entirely dropped. + Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 2, data: vec![0xDD; 50], is_last: true }, + // -- Third Channel -- + Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xFF; 16], number: 1, data: vec![0xDD; 50], is_last: true }, + ]; + let encoded = encode_frames(frames.clone()); + let config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut mock = MockFrameQueueProvider::new(vec![Ok(encoded)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Arc::new(config)); + assert!(frame_queue.is_holocene_active(BlockInfo::default())); + for frame in frames.iter().take(4) { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded, *frame); + } + for i in 0..2 { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded, frames[i + 6]); + } + let err = frame_queue.next_frame().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + } + + // Notice: The first channel is **not** dropped here because there can still be + // frames that come in to successfully close the channel. + #[tokio::test] + async fn test_holocene_unclosed_channel_with_invalid_start() { + let frames = vec![ + // -- First Channel -- + Frame { id: [0xEE; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 2, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 3, data: vec![0xDD; 50], is_last: false }, + // -- Next Channel -- + // This is also an invalid channel because it is never started + // since there isn't a first frame with number == 0 + Frame { id: [0xFF; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xFF; 16], number: 2, data: vec![0xDD; 50], is_last: true }, + ]; + let encoded = encode_frames(frames.clone()); + let config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut mock = MockFrameQueueProvider::new(vec![Ok(encoded)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Arc::new(config)); + assert!(frame_queue.is_holocene_active(BlockInfo::default())); + for frame in frames.iter().take(4) { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded, *frame); + } + let err = frame_queue.next_frame().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + } } diff --git a/crates/derive/src/stages/test_utils/frame_queue.rs b/crates/derive/src/stages/test_utils/frame_queue.rs index 5f0b1d30..a3cb15cf 100644 --- a/crates/derive/src/stages/test_utils/frame_queue.rs +++ b/crates/derive/src/stages/test_utils/frame_queue.rs @@ -16,18 +16,25 @@ use op_alloy_protocol::BlockInfo; pub struct MockFrameQueueProvider { /// The data to return. pub data: Vec>, + /// The origin to return. + pub origin: Option, } impl MockFrameQueueProvider { /// Creates a new [MockFrameQueueProvider] with the given data. pub const fn new(data: Vec>) -> Self { - Self { data } + Self { data, origin: None } + } + + /// Sets the origin for the [MockFrameQueueProvider]. + pub fn set_origin(&mut self, origin: BlockInfo) { + self.origin = Some(origin); } } impl OriginProvider for MockFrameQueueProvider { fn origin(&self) -> Option { - None + self.origin } }