Skip to content

Commit

Permalink
feat(derive): BatchValidator stage (#703)
Browse files Browse the repository at this point in the history
* feat(derive): `BatchValidator` stage

* tests

* lint

* tests

tests

* 🧹

* tests

* lint
  • Loading branch information
clabby authored Oct 19, 2024
1 parent 2991b08 commit 49c3118
Show file tree
Hide file tree
Showing 7 changed files with 708 additions and 39 deletions.
3 changes: 3 additions & 0 deletions crates/derive/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub enum PipelineError {
/// [L1Retrieval]: crate::stages::L1Retrieval
#[error("L1 Retrieval missing data")]
MissingL1Data,
/// Invalid batch type passed.
#[error("Invalid batch type passed to stage")]
InvalidBatchType,
/// Invalid batch validity variant.
#[error("Invalid batch validity")]
InvalidBatchValidity,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This module contains the `BatchQueue` stage implementation.

use super::NextBatchProvider;
use crate::{
batch::{Batch, BatchValidity, BatchWithInclusionBlock, SingleBatch},
errors::{PipelineEncodingError, PipelineError, PipelineErrorKind, PipelineResult, ResetError},
Expand All @@ -14,26 +15,6 @@ use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use tracing::{error, info, warn};

/// Provides [Batch]es for the [BatchQueue] stage.
#[async_trait]
pub trait BatchQueueProvider {
/// Returns the next [Batch] in the [ChannelReader] stage, if the stage is not complete.
/// This function can only be called once while the stage is in progress, and will return
/// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is
/// complete and the batch has been consumed, an [PipelineError::Eof] error is returned.
///
/// [ChannelReader]: crate::stages::ChannelReader
async fn next_batch(
&mut self,
parent: L2BlockInfo,
l1_origins: &[BlockInfo],
) -> PipelineResult<Batch>;

/// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream]
/// if an invalid single batch is found. Pre-holocene hardfork, this will be a no-op.
fn flush(&mut self);
}

/// [BatchQueue] is responsible for o rdering unordered batches
/// and gnerating empty batches when the sequence window has passed.
///
Expand All @@ -51,7 +32,7 @@ pub trait BatchQueueProvider {
#[derive(Debug)]
pub struct BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
BF: L2ChainProvider + Debug,
{
/// The rollup config.
Expand Down Expand Up @@ -79,7 +60,7 @@ where

impl<P, BF> BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
BF: L2ChainProvider + Debug,
{
/// Creates a new [BatchQueue] stage.
Expand Down Expand Up @@ -282,7 +263,7 @@ where
#[async_trait]
impl<P, BF> OriginAdvancer for BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
BF: L2ChainProvider + Send + Debug,
{
async fn advance_origin(&mut self) -> PipelineResult<()> {
Expand All @@ -293,7 +274,7 @@ where
#[async_trait]
impl<P, BF> AttributesProvider for BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
BF: L2ChainProvider + Send + Debug,
{
/// Returns the next valid batch upon the given safe head.
Expand Down Expand Up @@ -450,7 +431,7 @@ where

impl<P, BF> OriginProvider for BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
BF: L2ChainProvider + Debug,
{
fn origin(&self) -> Option<BlockInfo> {
Expand All @@ -461,7 +442,7 @@ where
#[async_trait]
impl<P, BF> SignalReceiver for BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
BF: L2ChainProvider + Send + Debug,
{
async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
Expand Down Expand Up @@ -509,7 +490,7 @@ mod tests {

fn new_batch_reader() -> BatchReader {
let file_contents =
alloc::string::String::from_utf8_lossy(include_bytes!("../../testdata/batch.hex"));
alloc::string::String::from_utf8_lossy(include_bytes!("../../../testdata/batch.hex"));
let file_contents = &(&*file_contents)[..file_contents.len() - 1];
let data = alloy_primitives::hex::decode(file_contents).unwrap();
let bytes: alloy_primitives::Bytes = data.into();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
batch::{Batch, BatchValidity, BatchWithInclusionBlock, SingleBatch, SpanBatch},
errors::{PipelineEncodingError, PipelineError, PipelineResult},
pipeline::L2ChainProvider,
stages::BatchQueueProvider,
stages::NextBatchProvider,
traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver},
};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
Expand Down Expand Up @@ -100,7 +100,7 @@ where
}

#[async_trait]
impl<P, BF> BatchQueueProvider for BatchStream<P, BF>
impl<P, BF> NextBatchProvider for BatchStream<P, BF>
where
P: BatchStreamProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
BF: L2ChainProvider + Send + Debug,
Expand All @@ -113,6 +113,10 @@ where
}
}

fn span_buffer_size(&self) -> usize {
self.buffer.len()
}

async fn next_batch(
&mut self,
parent: L2BlockInfo,
Expand Down Expand Up @@ -334,7 +338,7 @@ mod test {

let err = stream.next_batch(Default::default(), &mock_origins).await.unwrap_err();
assert_eq!(err, PipelineError::Eof.temp());
assert_eq!(stream.buffer.len(), 0);
assert_eq!(stream.span_buffer_size(), 0);
assert!(stream.span.is_none());

// Add more data into the provider, see if the buffer is re-hydrated.
Expand All @@ -359,7 +363,7 @@ mod test {

let err = stream.next_batch(Default::default(), &mock_origins).await.unwrap_err();
assert_eq!(err, PipelineError::Eof.temp());
assert_eq!(stream.buffer.len(), 0);
assert_eq!(stream.span_buffer_size(), 0);
assert!(stream.span.is_none());
}

Expand All @@ -376,7 +380,7 @@ mod test {
// The next batch should be passed through to the [BatchQueue] stage.
let batch = stream.next_batch(Default::default(), &[]).await.unwrap();
assert!(matches!(batch, Batch::Single(_)));
assert_eq!(stream.buffer.len(), 0);
assert_eq!(stream.span_buffer_size(), 0);
assert!(stream.span.is_none());
}
}
Loading

0 comments on commit 49c3118

Please sign in to comment.