From ed6f8f99518294451d0a05f668445ef35f1ad5b4 Mon Sep 17 00:00:00 2001 From: refcell Date: Mon, 30 Sep 2024 17:02:58 -0400 Subject: [PATCH] feat(derive): BatchStreamProvider (#591) * feat(derive): batch stream provider * feat(derive): add a test around channel reader flushing --- crates/derive/src/stages/batch_queue.rs | 8 ++- crates/derive/src/stages/batch_stream.rs | 32 +++++++---- crates/derive/src/stages/channel_reader.rs | 27 ++++++++- crates/derive/src/stages/mod.rs | 2 +- .../src/stages/test_utils/batch_queue.rs | 4 +- .../src/stages/test_utils/batch_stream.rs | 57 +++++++++++++++++++ crates/derive/src/stages/test_utils/mod.rs | 3 + 7 files changed, 114 insertions(+), 19 deletions(-) create mode 100644 crates/derive/src/stages/test_utils/batch_stream.rs diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 26386c25..d4196814 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -22,7 +22,11 @@ pub trait BatchQueueProvider { /// complete and the batch has been consumed, an [PipelineError::Eof] error is returned. /// /// [ChannelReader]: crate::stages::ChannelReader - async fn next_batch(&mut self) -> PipelineResult; + async fn next_batch( + &mut self, + parent: L2BlockInfo, + origins: &[BlockInfo], + ) -> PipelineResult; /// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream] /// if an invalid single batch is found. Pre-holocene hardfork, this will be a no-op. @@ -336,7 +340,7 @@ where // Load more data into the batch queue. let mut out_of_data = false; - match self.prev.next_batch().await { + match self.prev.next_batch(parent, &self.l1_blocks).await { Ok(b) => { if !origin_behind { self.add_batch(b, parent).await.ok(); diff --git a/crates/derive/src/stages/batch_stream.rs b/crates/derive/src/stages/batch_stream.rs index 69e89b7f..a4abcde8 100644 --- a/crates/derive/src/stages/batch_stream.rs +++ b/crates/derive/src/stages/batch_stream.rs @@ -4,7 +4,7 @@ use alloc::{boxed::Box, sync::Arc, vec::Vec}; use async_trait::async_trait; use core::fmt::Debug; use op_alloy_genesis::{RollupConfig, SystemConfig}; -use op_alloy_protocol::BlockInfo; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use tracing::trace; use crate::{ @@ -14,6 +14,16 @@ use crate::{ traits::{OriginAdvancer, OriginProvider, ResettableStage}, }; +/// Provides [Batch]es for the [BatchStream] stage. +#[async_trait] +pub trait BatchStreamProvider { + /// Returns the next [Batch] in the [BatchStream] stage. + async fn next_batch(&mut self) -> PipelineResult; + + /// Drains the recent `Channel` if an invalid span batch is found post-holocene. + fn flush(&mut self); +} + /// [BatchStream] stage in the derivation pipeline. /// /// This stage is introduced in the [Holocene] hardfork. @@ -26,7 +36,7 @@ use crate::{ #[derive(Debug)] pub struct BatchStream

where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, { /// The previous stage in the derivation pipeline. prev: P, @@ -41,7 +51,7 @@ where impl

BatchStream

where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, { /// Create a new [BatchStream] stage. pub const fn new(prev: P, config: Arc) -> Self { @@ -65,7 +75,7 @@ where #[async_trait] impl

BatchQueueProvider for BatchStream

where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, { fn flush(&mut self) { if self.is_active().unwrap_or(false) { @@ -73,7 +83,7 @@ where } } - async fn next_batch(&mut self) -> PipelineResult { + async fn next_batch(&mut self, _: L2BlockInfo, _: &[BlockInfo]) -> PipelineResult { // If the stage is not active, "pass" the next batch // through this stage to the BatchQueue stage. if !self.is_active()? { @@ -107,7 +117,7 @@ where #[async_trait] impl

OriginAdvancer for BatchStream

where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { self.prev.advance_origin().await @@ -116,7 +126,7 @@ where impl

OriginProvider for BatchStream

where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, { fn origin(&self) -> Option { self.prev.origin() @@ -126,7 +136,7 @@ where #[async_trait] impl

ResettableStage for BatchStream

where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send, + P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send, { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> { self.prev.reset(base, cfg).await?; @@ -141,7 +151,7 @@ mod test { use super::*; use crate::{ batch::SingleBatch, - stages::test_utils::{CollectingLayer, MockBatchQueueProvider, TraceStorage}, + stages::test_utils::{CollectingLayer, MockBatchStreamProvider, TraceStorage}, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -153,14 +163,14 @@ mod test { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let config = Arc::new(RollupConfig { holocene_time: Some(100), ..RollupConfig::default() }); - let prev = MockBatchQueueProvider::new(data); + let prev = MockBatchStreamProvider::new(data); let mut stream = BatchStream::new(prev, config.clone()); // The stage should not be active. assert!(!stream.is_active().unwrap()); // The next batch should be passed through to the [BatchQueue] stage. - let batch = stream.next_batch().await.unwrap(); + let batch = stream.next_batch(Default::default(), &[]).await.unwrap(); assert_eq!(batch, Batch::Single(SingleBatch::default())); let logs = trace_store.get_by_level(tracing::Level::TRACE); diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index a357414a..5d8b356e 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -3,7 +3,7 @@ use crate::{ batch::Batch, errors::{PipelineError, PipelineResult}, - stages::{decompress_brotli, BatchQueueProvider}, + stages::{decompress_brotli, BatchStreamProvider}, traits::{OriginAdvancer, OriginProvider, ResettableStage}, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; @@ -95,11 +95,19 @@ where } #[async_trait] -impl

BatchQueueProvider for ChannelReader

+impl

BatchStreamProvider for ChannelReader

where P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, { - fn flush(&mut self) { /* noop */ + /// This method is called by the BatchStream if an invalid span batch is found. + /// In the case of an invalid span batch, the associated channel must be flushed. + /// + /// See: + /// + /// SAFETY: Only called post-holocene activation. + fn flush(&mut self) { + debug!(target: "channel-reader", "[POST-HOLOCENE] Flushing channel"); + self.next_channel(); } async fn next_batch(&mut self) -> PipelineResult { @@ -290,4 +298,17 @@ mod test { reader.next_batch(&RollupConfig::default()).unwrap(); assert_eq!(reader.cursor, decompressed_len); } + + #[tokio::test] + async fn test_flush_post_holocene() { + let raw = new_compressed_batch_data(); + let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); + let mock = MockChannelReaderProvider::new(vec![Ok(Some(raw))]); + let mut reader = ChannelReader::new(mock, config); + let res = reader.next_batch().await.unwrap(); + matches!(res, Batch::Span(_)); + assert!(reader.next_batch.is_some()); + reader.flush(); + assert!(reader.next_batch.is_none()); + } } diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index e6c73b48..6c34e728 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -30,7 +30,7 @@ mod channel_reader; pub use channel_reader::{ChannelReader, ChannelReaderProvider}; mod batch_stream; -pub use batch_stream::BatchStream; +pub use batch_stream::{BatchStream, BatchStreamProvider}; mod batch_queue; pub use batch_queue::{BatchQueue, BatchQueueProvider}; diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs index e0bcc945..76646d3b 100644 --- a/crates/derive/src/stages/test_utils/batch_queue.rs +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -9,7 +9,7 @@ use crate::{ use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; use op_alloy_genesis::SystemConfig; -use op_alloy_protocol::BlockInfo; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; /// A mock provider for the [BatchQueue] stage. #[derive(Debug, Default)] @@ -38,7 +38,7 @@ impl BatchQueueProvider for MockBatchQueueProvider { fn flush(&mut self) { /* noop */ } - async fn next_batch(&mut self) -> PipelineResult { + async fn next_batch(&mut self, _: L2BlockInfo, _: &[BlockInfo]) -> PipelineResult { self.batches.pop().ok_or(PipelineError::Eof.temp())? } } diff --git a/crates/derive/src/stages/test_utils/batch_stream.rs b/crates/derive/src/stages/test_utils/batch_stream.rs new file mode 100644 index 00000000..efbd2cbc --- /dev/null +++ b/crates/derive/src/stages/test_utils/batch_stream.rs @@ -0,0 +1,57 @@ +//! A mock implementation of the [`BatchStream`] stage for testing. + +use crate::{ + batch::Batch, + errors::{PipelineError, PipelineResult}, + stages::batch_stream::BatchStreamProvider, + traits::{OriginAdvancer, OriginProvider, ResettableStage}, +}; +use alloc::{boxed::Box, vec::Vec}; +use async_trait::async_trait; +use op_alloy_genesis::SystemConfig; +use op_alloy_protocol::BlockInfo; + +/// A mock provider for the `BatchStream` stage. +#[derive(Debug, Default)] +pub struct MockBatchStreamProvider { + /// The origin of the L1 block. + pub origin: Option, + /// A list of batches to return. + pub batches: Vec>, +} + +impl MockBatchStreamProvider { + /// Creates a new [MockBatchStreamProvider] with the given origin and batches. + pub fn new(batches: Vec>) -> Self { + Self { origin: Some(BlockInfo::default()), batches } + } +} + +impl OriginProvider for MockBatchStreamProvider { + fn origin(&self) -> Option { + self.origin + } +} + +#[async_trait] +impl BatchStreamProvider for MockBatchStreamProvider { + fn flush(&mut self) {} + + async fn next_batch(&mut self) -> PipelineResult { + self.batches.pop().ok_or(PipelineError::Eof.temp())? + } +} + +#[async_trait] +impl OriginAdvancer for MockBatchStreamProvider { + async fn advance_origin(&mut self) -> PipelineResult<()> { + Ok(()) + } +} + +#[async_trait] +impl ResettableStage for MockBatchStreamProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> { + Ok(()) + } +} diff --git a/crates/derive/src/stages/test_utils/mod.rs b/crates/derive/src/stages/test_utils/mod.rs index ddc4b963..5c0b1c92 100644 --- a/crates/derive/src/stages/test_utils/mod.rs +++ b/crates/derive/src/stages/test_utils/mod.rs @@ -4,6 +4,9 @@ mod batch_queue; pub use batch_queue::MockBatchQueueProvider; +mod batch_stream; +pub use batch_stream::MockBatchStreamProvider; + mod attributes_queue; pub use attributes_queue::{ new_attributes_provider, MockAttributesBuilder, MockAttributesProvider,