Skip to content

Commit

Permalink
feat(derive): BatchStreamProvider (#591)
Browse files Browse the repository at this point in the history
* feat(derive): batch stream provider

* feat(derive): add a test around channel reader flushing
  • Loading branch information
refcell authored Sep 30, 2024
1 parent 1147740 commit ed6f8f9
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 19 deletions.
8 changes: 6 additions & 2 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ pub trait BatchQueueProvider {
/// complete and the batch has been consumed, an [PipelineError::Eof] error is returned.
///
/// [ChannelReader]: crate::stages::ChannelReader
async fn next_batch(&mut self) -> PipelineResult<Batch>;
async fn next_batch(
&mut self,
parent: L2BlockInfo,
origins: &[BlockInfo],
) -> PipelineResult<Batch>;

/// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream]
/// if an invalid single batch is found. Pre-holocene hardfork, this will be a no-op.
Expand Down Expand Up @@ -336,7 +340,7 @@ where

// Load more data into the batch queue.
let mut out_of_data = false;
match self.prev.next_batch().await {
match self.prev.next_batch(parent, &self.l1_blocks).await {
Ok(b) => {
if !origin_behind {
self.add_batch(b, parent).await.ok();
Expand Down
32 changes: 21 additions & 11 deletions crates/derive/src/stages/batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use alloc::{boxed::Box, sync::Arc, vec::Vec};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::BlockInfo;
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use tracing::trace;

use crate::{
Expand All @@ -14,6 +14,16 @@ use crate::{
traits::{OriginAdvancer, OriginProvider, ResettableStage},
};

/// Provides [Batch]es for the [BatchStream] stage.
#[async_trait]
pub trait BatchStreamProvider {
/// Returns the next [Batch] in the [BatchStream] stage.
async fn next_batch(&mut self) -> PipelineResult<Batch>;

/// Drains the recent `Channel` if an invalid span batch is found post-holocene.
fn flush(&mut self);
}

/// [BatchStream] stage in the derivation pipeline.
///
/// This stage is introduced in the [Holocene] hardfork.
Expand All @@ -26,7 +36,7 @@ use crate::{
#[derive(Debug)]
pub struct BatchStream<P>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
{
/// The previous stage in the derivation pipeline.
prev: P,
Expand All @@ -41,7 +51,7 @@ where

impl<P> BatchStream<P>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
{
/// Create a new [BatchStream] stage.
pub const fn new(prev: P, config: Arc<RollupConfig>) -> Self {
Expand All @@ -65,15 +75,15 @@ where
#[async_trait]
impl<P> BatchQueueProvider for BatchStream<P>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
{
fn flush(&mut self) {
if self.is_active().unwrap_or(false) {
self.buffer.clear();
}
}

async fn next_batch(&mut self) -> PipelineResult<Batch> {
async fn next_batch(&mut self, _: L2BlockInfo, _: &[BlockInfo]) -> PipelineResult<Batch> {
// If the stage is not active, "pass" the next batch
// through this stage to the BatchQueue stage.
if !self.is_active()? {
Expand Down Expand Up @@ -107,7 +117,7 @@ where
#[async_trait]
impl<P> OriginAdvancer for BatchStream<P>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
{
async fn advance_origin(&mut self) -> PipelineResult<()> {
self.prev.advance_origin().await
Expand All @@ -116,7 +126,7 @@ where

impl<P> OriginProvider for BatchStream<P>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
{
fn origin(&self) -> Option<BlockInfo> {
self.prev.origin()
Expand All @@ -126,7 +136,7 @@ where
#[async_trait]
impl<P> ResettableStage for BatchStream<P>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send,
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send,
{
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> {
self.prev.reset(base, cfg).await?;
Expand All @@ -141,7 +151,7 @@ mod test {
use super::*;
use crate::{
batch::SingleBatch,
stages::test_utils::{CollectingLayer, MockBatchQueueProvider, TraceStorage},
stages::test_utils::{CollectingLayer, MockBatchStreamProvider, TraceStorage},
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -153,14 +163,14 @@ mod test {

let data = vec![Ok(Batch::Single(SingleBatch::default()))];
let config = Arc::new(RollupConfig { holocene_time: Some(100), ..RollupConfig::default() });
let prev = MockBatchQueueProvider::new(data);
let prev = MockBatchStreamProvider::new(data);
let mut stream = BatchStream::new(prev, config.clone());

// The stage should not be active.
assert!(!stream.is_active().unwrap());

// The next batch should be passed through to the [BatchQueue] stage.
let batch = stream.next_batch().await.unwrap();
let batch = stream.next_batch(Default::default(), &[]).await.unwrap();
assert_eq!(batch, Batch::Single(SingleBatch::default()));

let logs = trace_store.get_by_level(tracing::Level::TRACE);
Expand Down
27 changes: 24 additions & 3 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::{
batch::Batch,
errors::{PipelineError, PipelineResult},
stages::{decompress_brotli, BatchQueueProvider},
stages::{decompress_brotli, BatchStreamProvider},
traits::{OriginAdvancer, OriginProvider, ResettableStage},
};
use alloc::{boxed::Box, sync::Arc, vec::Vec};
Expand Down Expand Up @@ -95,11 +95,19 @@ where
}

#[async_trait]
impl<P> BatchQueueProvider for ChannelReader<P>
impl<P> BatchStreamProvider for ChannelReader<P>
where
P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
{
fn flush(&mut self) { /* noop */
/// This method is called by the BatchStream if an invalid span batch is found.
/// In the case of an invalid span batch, the associated channel must be flushed.
///
/// See: <https://specs.optimism.io/protocol/holocene/derivation.html#span-batches>
///
/// SAFETY: Only called post-holocene activation.
fn flush(&mut self) {
debug!(target: "channel-reader", "[POST-HOLOCENE] Flushing channel");
self.next_channel();
}

async fn next_batch(&mut self) -> PipelineResult<Batch> {
Expand Down Expand Up @@ -290,4 +298,17 @@ mod test {
reader.next_batch(&RollupConfig::default()).unwrap();
assert_eq!(reader.cursor, decompressed_len);
}

#[tokio::test]
async fn test_flush_post_holocene() {
let raw = new_compressed_batch_data();
let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() });
let mock = MockChannelReaderProvider::new(vec![Ok(Some(raw))]);
let mut reader = ChannelReader::new(mock, config);
let res = reader.next_batch().await.unwrap();
matches!(res, Batch::Span(_));
assert!(reader.next_batch.is_some());
reader.flush();
assert!(reader.next_batch.is_none());
}
}
2 changes: 1 addition & 1 deletion crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod channel_reader;
pub use channel_reader::{ChannelReader, ChannelReaderProvider};

mod batch_stream;
pub use batch_stream::BatchStream;
pub use batch_stream::{BatchStream, BatchStreamProvider};

mod batch_queue;
pub use batch_queue::{BatchQueue, BatchQueueProvider};
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/stages/test_utils/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
use alloc::{boxed::Box, vec::Vec};
use async_trait::async_trait;
use op_alloy_genesis::SystemConfig;
use op_alloy_protocol::BlockInfo;
use op_alloy_protocol::{BlockInfo, L2BlockInfo};

/// A mock provider for the [BatchQueue] stage.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -38,7 +38,7 @@ impl BatchQueueProvider for MockBatchQueueProvider {
fn flush(&mut self) { /* noop */
}

async fn next_batch(&mut self) -> PipelineResult<Batch> {
async fn next_batch(&mut self, _: L2BlockInfo, _: &[BlockInfo]) -> PipelineResult<Batch> {
self.batches.pop().ok_or(PipelineError::Eof.temp())?
}
}
Expand Down
57 changes: 57 additions & 0 deletions crates/derive/src/stages/test_utils/batch_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//! A mock implementation of the [`BatchStream`] stage for testing.

use crate::{
batch::Batch,
errors::{PipelineError, PipelineResult},
stages::batch_stream::BatchStreamProvider,
traits::{OriginAdvancer, OriginProvider, ResettableStage},
};
use alloc::{boxed::Box, vec::Vec};
use async_trait::async_trait;
use op_alloy_genesis::SystemConfig;
use op_alloy_protocol::BlockInfo;

/// A mock provider for the `BatchStream` stage.
#[derive(Debug, Default)]
pub struct MockBatchStreamProvider {
/// The origin of the L1 block.
pub origin: Option<BlockInfo>,
/// A list of batches to return.
pub batches: Vec<PipelineResult<Batch>>,
}

impl MockBatchStreamProvider {
/// Creates a new [MockBatchStreamProvider] with the given origin and batches.
pub fn new(batches: Vec<PipelineResult<Batch>>) -> Self {
Self { origin: Some(BlockInfo::default()), batches }
}
}

impl OriginProvider for MockBatchStreamProvider {
fn origin(&self) -> Option<BlockInfo> {
self.origin
}
}

#[async_trait]
impl BatchStreamProvider for MockBatchStreamProvider {
fn flush(&mut self) {}

async fn next_batch(&mut self) -> PipelineResult<Batch> {
self.batches.pop().ok_or(PipelineError::Eof.temp())?
}
}

#[async_trait]
impl OriginAdvancer for MockBatchStreamProvider {
async fn advance_origin(&mut self) -> PipelineResult<()> {
Ok(())
}
}

#[async_trait]
impl ResettableStage for MockBatchStreamProvider {
async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> {
Ok(())
}
}
3 changes: 3 additions & 0 deletions crates/derive/src/stages/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
mod batch_queue;
pub use batch_queue::MockBatchQueueProvider;

mod batch_stream;
pub use batch_stream::MockBatchStreamProvider;

mod attributes_queue;
pub use attributes_queue::{
new_attributes_provider, MockAttributesBuilder, MockAttributesProvider,
Expand Down

0 comments on commit ed6f8f9

Please sign in to comment.