From eb8ed90a7c77376e7f1044d61a41046a758ef648 Mon Sep 17 00:00:00 2001 From: Renan Santos Date: Fri, 7 Jun 2024 13:38:44 -0300 Subject: [PATCH] fix: fix claim-mismatch on v.1x --- offchain/dispatcher/src/drivers/context.rs | 714 +++++++++++------- offchain/dispatcher/src/drivers/machine.rs | 543 +++++++------ offchain/dispatcher/src/drivers/mock.rs | 68 +- offchain/dispatcher/src/machine/mod.rs | 3 +- .../dispatcher/src/machine/rollups_broker.rs | 518 +++++++------ offchain/dispatcher/src/setup.rs | 12 +- 6 files changed, 1024 insertions(+), 834 deletions(-) diff --git a/offchain/dispatcher/src/drivers/context.rs b/offchain/dispatcher/src/drivers/context.rs index 5c788d5ac..32e79be1c 100644 --- a/offchain/dispatcher/src/drivers/context.rs +++ b/offchain/dispatcher/src/drivers/context.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use crate::{ - machine::{rollups_broker::BrokerFacadeError, BrokerSend, RollupStatus}, + machine::{rollups_broker::BrokerFacadeError, BrokerSend}, metrics::DispatcherMetrics, }; @@ -11,48 +11,53 @@ use types::foldables::Input; #[derive(Debug)] pub struct Context { - inputs_sent_count: u64, - last_event_is_finish_epoch: bool, - last_timestamp: u64, + inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, // constants - genesis_timestamp: u64, + genesis_block: u64, epoch_length: u64, + // metrics dapp_metadata: DAppMetadata, metrics: DispatcherMetrics, } impl Context { pub fn new( - genesis_timestamp: u64, + genesis_block: u64, epoch_length: u64, dapp_metadata: DAppMetadata, metrics: DispatcherMetrics, - status: RollupStatus, + inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, ) -> Self { + assert!(epoch_length > 0); Self { - inputs_sent_count: status.inputs_sent_count, - last_event_is_finish_epoch: status.last_event_is_finish_epoch, - last_timestamp: genesis_timestamp, - genesis_timestamp, + inputs_sent, + last_input_epoch, + last_finished_epoch, + genesis_block, epoch_length, dapp_metadata, metrics, } } - pub fn inputs_sent_count(&self) -> u64 { - self.inputs_sent_count + pub fn inputs_sent(&self) -> u64 { + self.inputs_sent } pub async fn finish_epoch_if_needed( &mut self, - event_timestamp: u64, + block: u64, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - if self.should_finish_epoch(event_timestamp) { - self.finish_epoch(event_timestamp, broker).await?; + let epoch = self.calculate_epoch(block); + if self.should_finish_epoch(epoch) { + self.finish_epoch(broker).await?; } Ok(()) } @@ -62,98 +67,117 @@ impl Context { input: &Input, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - broker.enqueue_input(self.inputs_sent_count, input).await?; + let input_block = input.block_added.number.as_u64(); + self.finish_epoch_if_needed(input_block, broker).await?; + + broker.enqueue_input(self.inputs_sent, input).await?; + self.metrics .advance_inputs_sent .get_or_create(&self.dapp_metadata) .inc(); - self.inputs_sent_count += 1; - self.last_event_is_finish_epoch = false; + + self.inputs_sent += 1; + self.last_input_epoch = + Some(self.calculate_epoch(input.block_added.number.as_u64())); + Ok(()) } } impl Context { - fn calculate_epoch(&self, timestamp: u64) -> u64 { - assert!(timestamp >= self.genesis_timestamp); - (timestamp - self.genesis_timestamp) / self.epoch_length - } - - // This logic works because we call this function with `event_timestamp` being equal to the - // timestamp of each individual input, rather than just the latest from the blockchain. - fn should_finish_epoch(&self, event_timestamp: u64) -> bool { - if self.inputs_sent_count == 0 || self.last_event_is_finish_epoch { - false - } else { - let current_epoch = self.calculate_epoch(self.last_timestamp); - let event_epoch = self.calculate_epoch(event_timestamp); - event_epoch > current_epoch + fn calculate_epoch(&self, block: u64) -> u64 { + assert!(block >= self.genesis_block); + (block - self.genesis_block) / self.epoch_length + } + + fn should_finish_epoch(&self, epoch: u64) -> bool { + if self.last_finished_epoch == self.last_input_epoch { + return false; // if the current epoch is empty + } + + if epoch == self.last_input_epoch.unwrap() { + return false; // if the current epoch is still not over } + + epoch > self.last_finished_epoch.unwrap_or(0) } async fn finish_epoch( &mut self, - event_timestamp: u64, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - assert!(event_timestamp >= self.genesis_timestamp); - broker.finish_epoch(self.inputs_sent_count).await?; + broker.finish_epoch(self.inputs_sent).await?; self.metrics .finish_epochs_sent .get_or_create(&self.dapp_metadata) .inc(); - self.last_timestamp = event_timestamp; - self.last_event_is_finish_epoch = true; + + self.last_finished_epoch = self.last_input_epoch; Ok(()) } } #[cfg(test)] -mod private_tests { - use crate::{drivers::mock, metrics::DispatcherMetrics}; +mod tests { + use std::collections::VecDeque; - use super::{Context, DAppMetadata}; + use crate::drivers::mock::Sent; + use rollups_events::DAppMetadata; + use serial_test::serial; - // -------------------------------------------------------------------------------------------- - // calculate_epoch_for - // -------------------------------------------------------------------------------------------- + use crate::{drivers::mock, metrics::DispatcherMetrics}; - fn new_context_for_calculate_epoch_test( - genesis_timestamp: u64, - epoch_length: u64, - ) -> Context { - Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 0, - genesis_timestamp, - epoch_length, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), + use super::Context; + + impl Default for Context { + fn default() -> Self { + Context::new( + /* genesis_block */ 0, + /* epoch_length */ 10, + /* dapp_metadata */ DAppMetadata::default(), + /* metrics */ DispatcherMetrics::default(), + /* number_of_inputs_sent */ 0, + /* last_input_epoch */ None, + /* last_finished_epoch */ None, + ) } } + // -------------------------------------------------------------------------------------------- + // calculate_epoch + // -------------------------------------------------------------------------------------------- + #[test] fn calculate_epoch_with_zero_genesis() { - let epoch_length = 3; - let context = new_context_for_calculate_epoch_test(0, epoch_length); - let n = 10; + let mut context = Context::default(); + context.genesis_block = 0; + context.epoch_length = 10; + + let number_of_epochs = 10; let mut tested = 0; - for epoch in 0..n { - let x = epoch * epoch_length; - let y = (epoch + 1) * epoch_length; - for i in x..y { - assert_eq!(context.calculate_epoch(i), epoch); + for current_epoch in 0..number_of_epochs { + let block_lower_bound = current_epoch * context.epoch_length; + let block_upper_bound = (current_epoch + 1) * context.epoch_length; + for i in block_lower_bound..block_upper_bound { + assert_eq!(context.calculate_epoch(i), current_epoch); tested += 1; } } - assert_eq!(tested, n * epoch_length); - assert_eq!(context.calculate_epoch(9), 3); + + assert_eq!(tested, number_of_epochs * context.epoch_length); + assert_eq!( + context.calculate_epoch(context.epoch_length * number_of_epochs), + context.epoch_length + ); } #[test] fn calculate_epoch_with_offset_genesis() { - let context = new_context_for_calculate_epoch_test(2, 2); + let mut context = Context::default(); + context.genesis_block = 2; + context.epoch_length = 2; + assert_eq!(context.calculate_epoch(2), 0); assert_eq!(context.calculate_epoch(3), 0); assert_eq!(context.calculate_epoch(4), 1); @@ -163,68 +187,120 @@ mod private_tests { #[test] #[should_panic] - fn calculate_epoch_invalid() { - new_context_for_calculate_epoch_test(4, 3).calculate_epoch(2); + fn calculate_epoch_should_panic() { + let mut context = Context::default(); + context.genesis_block = 4; + context.epoch_length = 4; + + context.calculate_epoch(2); } // -------------------------------------------------------------------------------------------- - // should_finish_epoch + // should_finish_epoch -- first epoch // -------------------------------------------------------------------------------------------- #[test] - fn should_not_finish_epoch_because_of_time() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(4)); + fn should_finish_the_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(10); + assert_eq!(context.should_finish_epoch(epoch), true); } #[test] - fn should_not_finish_epoch_because_of_zero_inputs() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(4)); + fn should_finish_the_first_epoch_by_a_lot() { + let mut context = Context::default(); + context.inputs_sent = 110; + context.last_input_epoch = Some(9); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(100); + assert_eq!(context.should_finish_epoch(epoch), true); } #[test] - fn should_finish_epoch_because_of_time() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(context.should_finish_epoch(5)); + fn should_not_finish_an_empty_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 0; + context.last_input_epoch = None; + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(10); + assert_eq!(context.should_finish_epoch(epoch), false); } #[test] - fn should_finish_epoch_because_last_event_is_finish_epoch() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: true, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(5)); + fn should_not_finish_a_very_late_empty_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 0; + context.last_input_epoch = None; + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(2340); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_timely_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(9); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + // -------------------------------------------------------------------------------------------- + // should_finish_epoch -- other epochs + // -------------------------------------------------------------------------------------------- + + #[test] + fn should_finish_epoch() { + let mut context = Context::default(); + context.inputs_sent = 42; + context.last_input_epoch = Some(4); + context.last_finished_epoch = Some(3); + let epoch = context.calculate_epoch(54); + assert_eq!(context.should_finish_epoch(epoch), true); + } + + #[test] + fn should_finish_epoch_by_a_lot() { + let mut context = Context::default(); + context.inputs_sent = 142; + context.last_input_epoch = Some(15); + context.last_finished_epoch = Some(2); + let epoch = context.calculate_epoch(190); + assert_eq!(context.should_finish_epoch(epoch), true); + } + + #[test] + fn should_not_finish_an_empty_epoch() { + let mut context = Context::default(); + context.inputs_sent = 120; + context.last_input_epoch = Some(9); + context.last_finished_epoch = Some(9); + let epoch = context.calculate_epoch(105); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_very_late_empty_epoch() { + let mut context = Context::default(); + context.inputs_sent = 120; + context.last_input_epoch = Some(15); + context.last_finished_epoch = Some(15); + let epoch = context.calculate_epoch(1000); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_timely_epoch() { + let mut context = Context::default(); + context.inputs_sent = 230; + context.last_input_epoch = Some(11); + context.last_finished_epoch = Some(10); + let epoch = context.calculate_epoch(110); + assert_eq!(context.should_finish_epoch(epoch), false); } // -------------------------------------------------------------------------------------------- @@ -233,72 +309,29 @@ mod private_tests { #[tokio::test] async fn finish_epoch_ok() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - let broker = mock::Broker::new(vec![], vec![]); - let timestamp = 6; - let result = context.finish_epoch(timestamp, &broker).await; - assert!(result.is_ok()); - assert_eq!(context.last_timestamp, timestamp); - assert!(context.last_event_is_finish_epoch); - } + let mut context = Context::default(); + context.inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; - #[tokio::test] - #[should_panic] - async fn finish_epoch_invalid() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 6, - genesis_timestamp: 5, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; let broker = mock::Broker::new(vec![], vec![]); - let _ = context.finish_epoch(0, &broker).await; + let result = context.finish_epoch(&broker).await; + assert!(result.is_ok()); + assert_eq!(context.inputs_sent, 1); + assert_eq!(context.last_input_epoch, Some(0)); + assert_eq!(context.last_finished_epoch, Some(0)); } #[tokio::test] async fn finish_epoch_broker_error() { - let last_timestamp = 3; - let last_event_is_finish_epoch = false; - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch, - last_timestamp, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); let broker = mock::Broker::with_finish_epoch_error(); - let result = context.finish_epoch(6, &broker).await; + let result = context.finish_epoch(&broker).await; assert!(result.is_err()); - assert_eq!(context.last_timestamp, last_timestamp); - assert_eq!( - context.last_event_is_finish_epoch, - last_event_is_finish_epoch - ); + assert_eq!(context.inputs_sent, 0); + assert_eq!(context.last_input_epoch, None); + assert_eq!(context.last_finished_epoch, None); } -} - -#[cfg(test)] -mod public_tests { - use crate::{ - drivers::mock::{self, SendInteraction}, - machine::RollupStatus, - metrics::DispatcherMetrics, - }; - - use super::{Context, DAppMetadata}; // -------------------------------------------------------------------------------------------- // new @@ -306,26 +339,41 @@ mod public_tests { #[tokio::test] async fn new_ok() { - let genesis_timestamp = 42; + let genesis_block = 42; let epoch_length = 24; - let inputs_sent_count = 150; - let last_event_is_finish_epoch = true; - let rollup_status = RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch, - }; + let number_of_inputs_sent = 150; + let last_input_epoch = Some(14); + let last_finished_epoch = Some(37); + let context = Context::new( - genesis_timestamp, + genesis_block, epoch_length, DAppMetadata::default(), DispatcherMetrics::default(), - rollup_status, + number_of_inputs_sent, + last_input_epoch, + last_finished_epoch, ); - assert_eq!(context.genesis_timestamp, genesis_timestamp); - assert_eq!(context.inputs_sent_count, inputs_sent_count); - assert_eq!( - context.last_event_is_finish_epoch, - last_event_is_finish_epoch + + assert_eq!(context.genesis_block, genesis_block); + assert_eq!(context.epoch_length, epoch_length); + assert_eq!(context.dapp_metadata, DAppMetadata::default()); + assert_eq!(context.inputs_sent, number_of_inputs_sent); + assert_eq!(context.last_input_epoch, last_input_epoch); + assert_eq!(context.last_finished_epoch, last_finished_epoch); + } + + #[test] + #[should_panic] + fn new_should_panic_because_epoch_length_is_zero() { + Context::new( + 0, + 0, + DAppMetadata::default(), + DispatcherMetrics::default(), + 0, + None, + None, ); } @@ -335,17 +383,10 @@ mod public_tests { #[test] fn inputs_sent_count() { - let inputs_sent_count = 42; - let context = Context { - inputs_sent_count, - last_event_is_finish_epoch: false, // ignored - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert_eq!(context.inputs_sent_count(), inputs_sent_count); + let number_of_inputs_sent = 42; + let mut context = Context::default(); + context.inputs_sent = number_of_inputs_sent; + assert_eq!(context.inputs_sent(), number_of_inputs_sent); } // -------------------------------------------------------------------------------------------- @@ -354,52 +395,40 @@ mod public_tests { #[tokio::test] async fn finish_epoch_if_needed_true() { - let mut context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 4, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let broker = mock::Broker::new(vec![], vec![]); - let result = context.finish_epoch_if_needed(4, &broker).await; + let result = context.finish_epoch_if_needed(12, &broker).await; assert!(result.is_ok()); - broker - .assert_send_interactions(vec![SendInteraction::FinishedEpoch(1)]); + broker.assert_state(vec![ + Sent::Finish, // + ]); } #[tokio::test] async fn finish_epoch_if_needed_false() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 2, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let broker = mock::Broker::new(vec![], vec![]); - let result = context.finish_epoch_if_needed(3, &broker).await; + let result = context.finish_epoch_if_needed(9, &broker).await; assert!(result.is_ok()); - broker.assert_send_interactions(vec![]); + broker.assert_state(vec![]); } #[tokio::test] async fn finish_epoch_if_needed_broker_error() { - let mut context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 4, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; let broker = mock::Broker::with_finish_epoch_error(); - let result = context.finish_epoch_if_needed(4, &broker).await; + let result = context.finish_epoch_if_needed(28, &broker).await; assert!(result.is_err()); } @@ -409,40 +438,215 @@ mod public_tests { #[tokio::test] async fn enqueue_input_ok() { - let inputs_sent_count = 42; - let mut context = Context { - inputs_sent_count, - last_event_is_finish_epoch: true, - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - let input = mock::new_input(2); + let number_of_inputs_sent = 42; + let last_input_epoch = Some(1); + let last_finished_epoch = None; + + let mut context = Context::default(); + context.inputs_sent = number_of_inputs_sent; + context.last_input_epoch = last_input_epoch; + context.last_finished_epoch = last_finished_epoch; + + let input = mock::new_input(22); let broker = mock::Broker::new(vec![], vec![]); let result = context.enqueue_input(&input, &broker).await; assert!(result.is_ok()); - assert_eq!(context.inputs_sent_count, inputs_sent_count + 1); - assert!(!context.last_event_is_finish_epoch); - broker.assert_send_interactions(vec![SendInteraction::EnqueuedInput( - inputs_sent_count, - )]); + + assert_eq!(context.inputs_sent, number_of_inputs_sent + 1); + assert_eq!(context.last_input_epoch, Some(2)); + assert_eq!(context.last_finished_epoch, Some(1)); + + broker.assert_state(vec![ + Sent::Finish, + Sent::Input(number_of_inputs_sent), + ]); } #[tokio::test] async fn enqueue_input_broker_error() { - let mut context = Context { - inputs_sent_count: 42, - last_event_is_finish_epoch: true, - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); let broker = mock::Broker::with_enqueue_input_error(); - let result = context.enqueue_input(&mock::new_input(2), &broker).await; + let result = context.enqueue_input(&mock::new_input(82), &broker).await; assert!(result.is_err()); } + + // -------------------------------------------------------------------------------------------- + // deterministic behavior + // -------------------------------------------------------------------------------------------- + + #[derive(Clone)] + struct Case { + input_blocks: Vec, + epoch_length: u64, + last_block: u64, + expected: Vec, + } + + #[tokio::test] + #[serial] + async fn deterministic_behavior() { + let cases: Vec = vec![ + Case { + input_blocks: vec![], + epoch_length: 2, + last_block: 100, + expected: vec![], + }, + Case { + input_blocks: vec![0, 1, 4, 5], + epoch_length: 2, + last_block: 10, + expected: vec![ + Sent::Input(0), + Sent::Input(1), + Sent::Finish, + Sent::Input(2), + Sent::Input(3), + Sent::Finish, + ], + }, + Case { + input_blocks: vec![0, 2], + epoch_length: 2, + last_block: 4, + expected: vec![ + Sent::Input(0), + Sent::Finish, + Sent::Input(1), + Sent::Finish, + ], + }, + Case { + input_blocks: vec![1, 2, 4], + epoch_length: 2, + last_block: 6, + expected: vec![ + Sent::Input(0), + Sent::Finish, + Sent::Input(1), + Sent::Finish, + Sent::Input(2), + Sent::Finish, + ], + }, + Case { + input_blocks: vec![0, 1, 2, 3, 4, 5, 6, 7], + epoch_length: 2, + last_block: 7, + expected: vec![ + Sent::Input(0), + Sent::Input(1), + Sent::Finish, + Sent::Input(2), + Sent::Input(3), + Sent::Finish, + Sent::Input(4), + Sent::Input(5), + Sent::Finish, + Sent::Input(6), + Sent::Input(7), + ], + }, + Case { + input_blocks: vec![0, 5, 9], + epoch_length: 2, + last_block: 10, + expected: vec![ + Sent::Input(0), + Sent::Finish, + Sent::Input(1), + Sent::Finish, + Sent::Input(2), + Sent::Finish, + ], + }, + ]; + for (i, case) in cases.iter().enumerate() { + println!("Testing case {}.", i); + test_deterministic_case(case.clone()).await; + } + } + + // -------------------------------------------------------------------------------------------- + // auxiliary + // -------------------------------------------------------------------------------------------- + + async fn test_deterministic_case(case: Case) { + let broker1 = one_at_a_time( + case.epoch_length, + case.input_blocks.clone(), + case.last_block, + ) + .await; + let broker2 = all_at_once( + case.epoch_length, + case.input_blocks.clone(), + case.last_block, + ) + .await; + broker1.assert_state(case.expected.clone()); + broker2.assert_state(case.expected.clone()); + } + + async fn one_at_a_time( + epoch_length: u64, + input_blocks: Vec, + last_block: u64, + ) -> mock::Broker { + println!("================================================"); + println!("one_block_at_a_time:"); + + let mut input_blocks: VecDeque<_> = input_blocks.into(); + let mut current_input_block = input_blocks.pop_front(); + + let mut context = Context::default(); + context.epoch_length = epoch_length; + let broker = mock::Broker::new(vec![], vec![]); + + for block in 0..=last_block { + if let Some(input_block) = current_input_block { + if block == input_block { + println!("\tenqueue_input(input_block: {})", block); + let input = mock::new_input(block); + let result = context.enqueue_input(&input, &broker).await; + assert!(result.is_ok()); + + current_input_block = input_blocks.pop_front(); + } + } + + println!("\tfinish_epoch_if_needed(block: {})\n", block); + let result = context.finish_epoch_if_needed(block, &broker).await; + assert!(result.is_ok()); + } + + broker + } + + async fn all_at_once( + epoch_length: u64, + input_blocks: Vec, + last_block: u64, + ) -> mock::Broker { + println!("all_inputs_at_once:"); + + let mut context = Context::default(); + context.epoch_length = epoch_length; + let broker = mock::Broker::new(vec![], vec![]); + + for block in input_blocks { + println!("\tenqueue_input(input_block: {})\n", block); + let input = mock::new_input(block); + let result = context.enqueue_input(&input, &broker).await; + assert!(result.is_ok()); + } + + println!("\tfinish_epoch_if_needed(last_block: {})", last_block); + let result = context.finish_epoch_if_needed(last_block, &broker).await; + assert!(result.is_ok()); + + println!("================================================"); + + broker + } } diff --git a/offchain/dispatcher/src/drivers/machine.rs b/offchain/dispatcher/src/drivers/machine.rs index 3f22f97f6..6e0036997 100644 --- a/offchain/dispatcher/src/drivers/machine.rs +++ b/offchain/dispatcher/src/drivers/machine.rs @@ -6,9 +6,9 @@ use super::Context; use crate::machine::{rollups_broker::BrokerFacadeError, BrokerSend}; use eth_state_fold_types::{ethereum_types::Address, Block}; -use types::foldables::{DAppInputBox, Input, InputBox}; +use types::foldables::{DAppInputBox, InputBox}; -use tracing::{debug, instrument, trace}; +use tracing::{debug, instrument}; pub struct MachineDriver { dapp_address: Address, @@ -27,21 +27,17 @@ impl MachineDriver { input_box: &InputBox, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - let dapp_input_box = - match input_box.dapp_input_boxes.get(&self.dapp_address) { - None => { - debug!("No inputs for dapp {}", self.dapp_address); - return Ok(()); - } - - Some(d) => d, - }; - - self.process_inputs(context, dapp_input_box, broker).await?; + match input_box.dapp_input_boxes.get(&self.dapp_address) { + None => { + debug!("No inputs for dapp {}", self.dapp_address); + } + Some(dapp_input_box) => { + self.process_inputs(context, dapp_input_box, broker).await? + } + }; - context - .finish_epoch_if_needed(block.timestamp.as_u64(), broker) - .await?; + let block = block.number.as_u64(); + context.finish_epoch_if_needed(block, broker).await?; Ok(()) } @@ -57,163 +53,63 @@ impl MachineDriver { ) -> Result<(), BrokerFacadeError> { tracing::trace!( "Last input sent to machine manager `{}`, current input `{}`", - context.inputs_sent_count(), + context.inputs_sent(), dapp_input_box.inputs.len() ); - let input_slice = dapp_input_box - .inputs - .skip(context.inputs_sent_count() as usize); + let input_slice = + dapp_input_box.inputs.skip(context.inputs_sent() as usize); for input in input_slice { - self.process_input(context, &input, broker).await?; + context.enqueue_input(&input, broker).await?; } Ok(()) } - - #[instrument(level = "trace", skip_all)] - async fn process_input( - &self, - context: &mut Context, - input: &Input, - broker: &impl BrokerSend, - ) -> Result<(), BrokerFacadeError> { - let input_timestamp = input.block_added.timestamp.as_u64(); - trace!(?context, ?input_timestamp); - - context - .finish_epoch_if_needed(input_timestamp, broker) - .await?; - - context.enqueue_input(input, broker).await?; - - Ok(()) - } } #[cfg(test)] mod tests { - use eth_state_fold_types::{ethereum_types::H160, Block}; - use rollups_events::DAppMetadata; use std::sync::Arc; + use eth_state_fold_types::ethereum_types::H160; + use rollups_events::DAppMetadata; + use types::foldables::InputBox; + use crate::{ drivers::{ - mock::{self, SendInteraction}, + machine::MachineDriver, + mock::{self, Broker}, Context, }, machine::RollupStatus, metrics::DispatcherMetrics, }; - use super::MachineDriver; - - // -------------------------------------------------------------------------------------------- - // process_input - // -------------------------------------------------------------------------------------------- - - async fn test_process_input( - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, - ) { - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, + fn new_context1( + genesis_block: u64, + epoch_length: u64, + inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, + ) -> Context { + Context::new( + genesis_block, + epoch_length, DAppMetadata::default(), DispatcherMetrics::default(), - rollup_status, - ); - let machine_driver = MachineDriver::new(H160::random()); - for block_timestamp in input_timestamps { - let input = mock::new_input(block_timestamp); - let result = machine_driver - .process_input(&mut context, &input, &broker) - .await; - assert!(result.is_ok()); - } - - broker.assert_send_interactions(expected); + inputs_sent, + last_input_epoch, + last_finished_epoch, + ) } - #[tokio::test] - async fn process_input_right_before_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![4]; - let send_interactions = vec![SendInteraction::EnqueuedInput(0)]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_at_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![5]; - let send_interactions = vec![ - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_last_event_is_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: true, - }; - let input_timestamps = vec![5]; - let send_interactions = vec![SendInteraction::EnqueuedInput(0)]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_after_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 3, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![6, 7]; - let send_interactions = vec![ - SendInteraction::FinishedEpoch(3), - SendInteraction::EnqueuedInput(3), - SendInteraction::EnqueuedInput(4), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_crossing_two_epochs() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![3, 4, 5, 6, 7, 9, 10, 11]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::FinishedEpoch(2), - SendInteraction::EnqueuedInput(2), - SendInteraction::EnqueuedInput(3), - SendInteraction::EnqueuedInput(4), - SendInteraction::EnqueuedInput(5), - SendInteraction::FinishedEpoch(6), - SendInteraction::EnqueuedInput(6), - SendInteraction::EnqueuedInput(7), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; + fn new_context2( + inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, + ) -> Context { + new_context1(0, 10, inputs_sent, last_input_epoch, last_finished_epoch) } // -------------------------------------------------------------------------------------------- @@ -221,23 +117,19 @@ mod tests { // -------------------------------------------------------------------------------------------- async fn test_process_inputs( - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, + mut context: Context, + input_blocks: Vec, + expected: Vec, ) { + let rollup_status = RollupStatus { + number_of_inputs_sent: context.inputs_sent(), + }; let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); let machine_driver = MachineDriver::new(H160::random()); let dapp_input_box = types::foldables::DAppInputBox { - inputs: input_timestamps + inputs: input_blocks .iter() - .map(|timestamp| Arc::new(mock::new_input(*timestamp))) + .map(|block| Arc::new(mock::new_input(*block))) .collect::>() .into(), }; @@ -246,48 +138,36 @@ mod tests { .await; assert!(result.is_ok()); - broker.assert_send_interactions(expected); + broker.assert_state(expected); } #[tokio::test] - async fn test_process_inputs_without_skipping() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::EnqueuedInput(2), - SendInteraction::EnqueuedInput(3), + async fn test_process_inputs_without_skipping_inputs() { + let context = new_context2(0, None, None); + let input_blocks = vec![0, 1, 2, 3]; + let expected = vec![ + mock::Sent::Input(0), + mock::Sent::Input(1), + mock::Sent::Input(2), + mock::Sent::Input(3), ]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; + test_process_inputs(context, input_blocks, expected).await; } #[tokio::test] - async fn process_inputs_with_some_skipping() { - let rollup_status = RollupStatus { - inputs_sent_count: 3, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![SendInteraction::EnqueuedInput(3)]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; + async fn process_inputs_with_some_skipped_inputs() { + let context = new_context2(2, Some(0), None); + let input_blocks = vec![0, 1, 2, 3]; + let expected = vec![mock::Sent::Input(2), mock::Sent::Input(3)]; + test_process_inputs(context, input_blocks, expected).await; } #[tokio::test] - async fn process_inputs_skipping_all() { - let rollup_status = RollupStatus { - inputs_sent_count: 4, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; + async fn process_inputs_skipping_all_inputs() { + let context = new_context2(4, Some(0), None); + let input_blocks = vec![0, 1, 2, 3]; + let expected = vec![]; + test_process_inputs(context, input_blocks, expected).await; } // -------------------------------------------------------------------------------------------- @@ -295,123 +175,236 @@ mod tests { // -------------------------------------------------------------------------------------------- async fn test_react( - block: Block, - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, - ) { - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); - + block: u64, + mut context: Context, + broker: Option, + input_box: Option, + input_blocks: Vec, + expected: Vec, + ) -> (Context, Broker, InputBox) { + let rollup_status = RollupStatus { + number_of_inputs_sent: context.inputs_sent(), + }; + let broker = broker + .unwrap_or(mock::Broker::new(vec![rollup_status], Vec::new())); let dapp_address = H160::random(); let machine_driver = MachineDriver::new(dapp_address); - let input_box = mock::new_input_box(); + let input_box = input_box.unwrap_or(mock::new_input_box()); let input_box = - mock::update_input_box(input_box, dapp_address, input_timestamps); + mock::update_input_box(input_box, dapp_address, input_blocks); let result = machine_driver - .react(&mut context, &block, &input_box, &broker) + .react(&mut context, &mock::new_block(block), &input_box, &broker) .await; assert!(result.is_ok()); - broker.assert_send_interactions(expected); + broker.assert_state(expected); + + (context, broker, input_box) } #[tokio::test] async fn react_without_finish_epoch() { - let block = mock::new_block(3); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; + let block = 3; + let context = new_context2(0, None, None); + let input_blocks = vec![1, 2]; + let expected = vec![mock::Sent::Input(0), mock::Sent::Input(1)]; + test_react(block, context, None, None, input_blocks, expected).await; } #[tokio::test] async fn react_with_finish_epoch() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::FinishedEpoch(2), + let block = 10; + let context = new_context2(0, None, None); + let input_blocks = vec![1, 2]; + let expected = vec![ + mock::Sent::Input(0), + mock::Sent::Input(1), + mock::Sent::Finish, ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; + test_react(block, context, None, None, input_blocks, expected).await; } #[tokio::test] async fn react_with_internal_finish_epoch() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![4, 5]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), + let block = 14; + let context = new_context2(0, None, None); + let input_blocks = vec![9, 10]; + let expected = vec![ + mock::Sent::Input(0), + mock::Sent::Finish, + mock::Sent::Input(1), ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; + test_react(block, context, None, None, input_blocks, expected).await; } #[tokio::test] async fn react_without_inputs() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); - let block = mock::new_block(5); - let input_box = mock::new_input_box(); - let machine_driver = MachineDriver::new(H160::random()); - let result = machine_driver - .react(&mut context, &block, &input_box, &broker) - .await; - assert!(result.is_ok()); - broker.assert_send_interactions(vec![]); + let block = 10; + let context = new_context2(0, None, None); + let input_blocks = vec![]; + let expected = vec![]; + test_react(block, context, None, None, input_blocks, expected).await; } + // NOTE: this test shows we DON'T close the epoch after the first input! #[tokio::test] async fn react_with_inputs_after_first_epoch_length() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![7, 8]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), + let block = 20; + let context = new_context2(0, None, None); + let input_blocks = vec![14, 16, 18, 20]; + let expected = vec![ + mock::Sent::Input(0), + mock::Sent::Input(1), + mock::Sent::Input(2), + mock::Sent::Finish, + mock::Sent::Input(3), + ]; + test_react(block, context, None, None, input_blocks, expected).await; + } + + #[tokio::test] + async fn react_is_deterministic() { + let final_expected = vec![ + mock::Sent::Input(0), + mock::Sent::Finish, + mock::Sent::Input(1), + mock::Sent::Input(2), + mock::Sent::Input(3), + mock::Sent::Input(4), + mock::Sent::Input(5), + mock::Sent::Input(6), + mock::Sent::Input(7), + mock::Sent::Input(8), + mock::Sent::Input(9), + mock::Sent::Finish, + mock::Sent::Input(10), + mock::Sent::Input(11), + mock::Sent::Input(12), + mock::Sent::Input(13), + mock::Sent::Input(14), + mock::Sent::Input(15), + mock::Sent::Finish, + mock::Sent::Input(16), + mock::Sent::Input(17), + mock::Sent::Input(18), ]; - test_react(block, rollup_status, input_timestamps, send_interactions) + + { + // original + let block1 = 3100; + let block2 = 6944; + + let context = new_context1(0, 1000, 0, None, None); + + let input_blocks1 = vec![ + 56, // + // + 1078, // + 1091, // + 1159, // + 1204, // + 1227, // + 1280, // + 1298, // + 1442, // + 1637, // + // + 2827, // + 2881, // + 2883, // + 2887, // + 2891, // + 2934, // + ]; + let mut input_blocks2 = input_blocks1.clone(); + input_blocks2.append(&mut vec![ + 6160, // + 6864, // + 6944, // + ]); + + let expected1 = vec![ + mock::Sent::Input(0), + mock::Sent::Finish, + mock::Sent::Input(1), + mock::Sent::Input(2), + mock::Sent::Input(3), + mock::Sent::Input(4), + mock::Sent::Input(5), + mock::Sent::Input(6), + mock::Sent::Input(7), + mock::Sent::Input(8), + mock::Sent::Input(9), + mock::Sent::Finish, + mock::Sent::Input(10), + mock::Sent::Input(11), + mock::Sent::Input(12), + mock::Sent::Input(13), + mock::Sent::Input(14), + mock::Sent::Input(15), + mock::Sent::Finish, + ]; + + let (context, broker, input_box) = test_react( + block1, + context, + None, + None, + input_blocks1, + expected1, + ) + .await; + + test_react( + block2, + context, + Some(broker), + Some(input_box), + input_blocks2, + final_expected.clone(), + ) .await; + } + + { + // reconstruction + let block = 6944; + let context = new_context1(0, 1000, 0, None, None); + let input_blocks = vec![ + 56, // + // + 1078, // + 1091, // + 1159, // + 1204, // + 1227, // + 1280, // + 1298, // + 1442, // + 1637, // + // + 2827, // + 2881, // + 2883, // + 2887, // + 2891, // + 2934, // + // + 6160, // + 6864, // + 6944, // + ]; + test_react( + block, + context, + None, + None, + input_blocks, + final_expected, + ) + .await; + } } } diff --git a/offchain/dispatcher/src/drivers/mock.rs b/offchain/dispatcher/src/drivers/mock.rs index d56acd616..e631b811d 100644 --- a/offchain/dispatcher/src/drivers/mock.rs +++ b/offchain/dispatcher/src/drivers/mock.rs @@ -24,21 +24,21 @@ use crate::machine::{ // auxiliary functions // ------------------------------------------------------------------------------------------------ -pub fn new_block(timestamp: u32) -> Block { +pub fn new_block(number: u64) -> Block { Block { hash: H256::random(), - number: 0.into(), + number: number.into(), parent_hash: H256::random(), - timestamp: timestamp.into(), + timestamp: 0.into(), logs_bloom: Bloom::default(), } } -pub fn new_input(timestamp: u32) -> Input { +pub fn new_input(block: u64) -> Input { Input { sender: Arc::new(H160::random()), payload: vec![], - block_added: Arc::new(new_block(timestamp)), + block_added: Arc::new(new_block(block)), dapp: Arc::new(H160::random()), tx_hash: Arc::new(H256::default()), } @@ -55,11 +55,11 @@ pub fn new_input_box() -> InputBox { pub fn update_input_box( input_box: InputBox, dapp_address: Address, - timestamps: Vec, + blocks: Vec, ) -> InputBox { - let inputs = timestamps + let inputs = blocks .iter() - .map(|timestamp| Arc::new(new_input(*timestamp))) + .map(|block| Arc::new(new_input(*block))) .collect::>(); let inputs = Vector::from(inputs); let dapp_input_boxes = input_box @@ -77,16 +77,16 @@ pub fn update_input_box( // ------------------------------------------------------------------------------------------------ #[derive(Debug, Clone, Copy, PartialEq)] -pub enum SendInteraction { - EnqueuedInput(u64), - FinishedEpoch(u64), +pub enum Sent { + Input(u64), // input index + Finish, } #[derive(Debug)] pub struct Broker { pub rollup_statuses: Mutex>, pub next_claims: Mutex>, - pub send_interactions: Mutex>, + pub sents: Mutex>, status_error: bool, enqueue_input_error: bool, finish_epoch_error: bool, @@ -97,7 +97,7 @@ impl Broker { Self { rollup_statuses: Mutex::new(VecDeque::new()), next_claims: Mutex::new(VecDeque::new()), - send_interactions: Mutex::new(Vec::new()), + sents: Mutex::new(Vec::new()), status_error: false, enqueue_input_error: false, finish_epoch_error: false, @@ -126,28 +126,29 @@ impl Broker { broker } - fn send_interactions_len(&self) -> usize { - let mutex_guard = self.send_interactions.lock().unwrap(); + fn sents_len(&self) -> usize { + let mutex_guard = self.sents.lock().unwrap(); mutex_guard.deref().len() } - fn get_send_interaction(&self, i: usize) -> SendInteraction { - let mutex_guard = self.send_interactions.lock().unwrap(); + fn get_sent(&self, i: usize) -> Sent { + let mutex_guard = self.sents.lock().unwrap(); mutex_guard.deref().get(i).unwrap().clone() } - pub fn assert_send_interactions(&self, expected: Vec) { + pub fn assert_state(&self, expected: Vec) { assert_eq!( - self.send_interactions_len(), + self.sents_len(), expected.len(), - "{:?}", - self.send_interactions + "\n{:?}\n{:?}", + self.sents.lock().unwrap().deref(), + expected ); - println!("Send interactions:"); + println!("Sents:"); for (i, expected) in expected.iter().enumerate() { - let send_interaction = self.get_send_interaction(i); - println!("{:?} - {:?}", send_interaction, expected); - assert_eq!(send_interaction, *expected); + let sent = self.get_sent(i); + println!("index: {:?} => {:?} - {:?}", i, sent, expected); + assert_eq!(sent, *expected); } } } @@ -174,25 +175,18 @@ impl BrokerSend for Broker { if self.enqueue_input_error { whatever!("enqueue_input error") } else { - let mut mutex_guard = self.send_interactions.lock().unwrap(); - mutex_guard - .deref_mut() - .push(SendInteraction::EnqueuedInput(input_index)); + let mut mutex_guard = self.sents.lock().unwrap(); + mutex_guard.deref_mut().push(Sent::Input(input_index)); Ok(()) } } - async fn finish_epoch( - &self, - inputs_sent_count: u64, - ) -> Result<(), BrokerFacadeError> { + async fn finish_epoch(&self, _: u64) -> Result<(), BrokerFacadeError> { if self.finish_epoch_error { whatever!("finish_epoch error") } else { - let mut mutex_guard = self.send_interactions.lock().unwrap(); - mutex_guard - .deref_mut() - .push(SendInteraction::FinishedEpoch(inputs_sent_count)); + let mut mutex_guard = self.sents.lock().unwrap(); + mutex_guard.deref_mut().push(Sent::Finish); Ok(()) } } diff --git a/offchain/dispatcher/src/machine/mod.rs b/offchain/dispatcher/src/machine/mod.rs index 735fdff23..59a54c131 100644 --- a/offchain/dispatcher/src/machine/mod.rs +++ b/offchain/dispatcher/src/machine/mod.rs @@ -11,8 +11,7 @@ use self::rollups_broker::BrokerFacadeError; #[derive(Debug, Clone, Copy, Default)] pub struct RollupStatus { - pub inputs_sent_count: u64, - pub last_event_is_finish_epoch: bool, + pub number_of_inputs_sent: u64, } #[async_trait] diff --git a/offchain/dispatcher/src/machine/rollups_broker.rs b/offchain/dispatcher/src/machine/rollups_broker.rs index e430e4fe7..bdd9d2fac 100644 --- a/offchain/dispatcher/src/machine/rollups_broker.rs +++ b/offchain/dispatcher/src/machine/rollups_broker.rs @@ -192,13 +192,11 @@ impl From for RollupStatus { match payload.data { RollupsData::AdvanceStateInput { .. } => RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch: false, + number_of_inputs_sent: inputs_sent_count, }, RollupsData::FinishEpoch { .. } => RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch: true, + number_of_inputs_sent: inputs_sent_count, }, } } @@ -249,7 +247,7 @@ fn build_next_input( block_number: input.block_added.number.as_u64(), timestamp: input.block_added.timestamp.as_u64(), epoch_index: 0, - input_index: status.status.inputs_sent_count, + input_index: status.status.number_of_inputs_sent, }; let data = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { @@ -261,7 +259,7 @@ fn build_next_input( RollupsInput { parent_id: status.id.clone(), epoch_index: status.epoch_number, - inputs_sent_count: status.status.inputs_sent_count + 1, + inputs_sent_count: status.status.number_of_inputs_sent + 1, data, } } @@ -270,261 +268,261 @@ fn build_next_finish_epoch(status: &BrokerStreamStatus) -> RollupsInput { RollupsInput { parent_id: status.id.clone(), epoch_index: status.epoch_number, - inputs_sent_count: status.status.inputs_sent_count, + inputs_sent_count: status.status.number_of_inputs_sent, data: RollupsData::FinishEpoch {}, } } -#[cfg(test)] -mod broker_facade_tests { - use std::{sync::Arc, time::Duration}; - - use backoff::ExponentialBackoffBuilder; - use eth_state_fold_types::{ - ethereum_types::{Bloom, H160, H256, U256, U64}, - Block, - }; - use rollups_events::{ - BrokerConfig, BrokerEndpoint, DAppMetadata, Hash, InputMetadata, - Payload, RedactedUrl, RollupsAdvanceStateInput, RollupsData, Url, - }; - use test_fixtures::broker::BrokerFixture; - use testcontainers::clients::Cli; - use types::foldables::Input; - - use crate::machine::{ - rollups_broker::BrokerFacadeError, BrokerSend, BrokerStatus, - }; - - use super::BrokerFacade; - - // -------------------------------------------------------------------------------------------- - // new - // -------------------------------------------------------------------------------------------- - - #[tokio::test] - async fn new_ok() { - let docker = Cli::default(); - let (_fixture, _broker) = setup(&docker).await; - } - - #[tokio::test] - async fn new_error() { - let docker = Cli::default(); - let error = failable_setup(&docker, true) - .await - .err() - .expect("'status' function has not failed") - .to_string(); - // BrokerFacadeError::BrokerConnectionError - assert_eq!(error, "error connecting to the broker"); - } - - // -------------------------------------------------------------------------------------------- - // status - // -------------------------------------------------------------------------------------------- - - #[tokio::test] - async fn status_inputs_sent_count_equals_0() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 0); - assert!(!status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_inputs_sent_count_equals_1() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_advance_state_inputs(&fixture, 1).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 1); - assert!(!status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_inputs_sent_count_equals_10() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_advance_state_inputs(&fixture, 10).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 10); - assert!(!status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_is_finish_epoch() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_finish_epoch_input(&fixture).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 0); - assert!(status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_inputs_with_finish_epoch() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_advance_state_inputs(&fixture, 5).await; - produce_finish_epoch_input(&fixture).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 5); - assert!(status.last_event_is_finish_epoch); - } - - // -------------------------------------------------------------------------------------------- - // enqueue_input - // -------------------------------------------------------------------------------------------- - - #[tokio::test] - async fn enqueue_input_ok() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - for i in 0..3 { - assert!(broker - .enqueue_input(i, &new_enqueue_input()) - .await - .is_ok()); - } - } - - #[tokio::test] - #[should_panic( - expected = "assertion `left == right` failed\n left: 1\n right: 6" - )] - async fn enqueue_input_assertion_error_1() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - let _ = broker.enqueue_input(5, &new_enqueue_input()).await; - } - - #[tokio::test] - #[should_panic( - expected = "assertion `left == right` failed\n left: 5\n right: 6" - )] - async fn enqueue_input_assertion_error_2() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - for i in 0..4 { - assert!(broker - .enqueue_input(i, &new_enqueue_input()) - .await - .is_ok()); - } - let _ = broker.enqueue_input(5, &new_enqueue_input()).await; - } - - // NOTE: cannot test result error because the dependency is not injectable. - - // -------------------------------------------------------------------------------------------- - // finish_epoch - // -------------------------------------------------------------------------------------------- - - #[tokio::test] - async fn finish_epoch_ok_1() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - assert!(broker.finish_epoch(0).await.is_ok()); - // BONUS TEST: testing for a finished epoch with no inputs - assert!(broker.finish_epoch(0).await.is_ok()); - } - - #[tokio::test] - async fn finish_epoch_ok_2() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - let first_epoch_inputs = 3; - produce_advance_state_inputs(&fixture, first_epoch_inputs).await; - produce_finish_epoch_input(&fixture).await; - let second_epoch_inputs = 7; - produce_advance_state_inputs(&fixture, second_epoch_inputs).await; - let total_inputs = first_epoch_inputs + second_epoch_inputs; - assert!(broker.finish_epoch(total_inputs as u64).await.is_ok()); - } - - #[tokio::test] - #[should_panic( - expected = "assertion `left == right` failed\n left: 0\n right: 1" - )] - async fn finish_epoch_assertion_error() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - let _ = broker.finish_epoch(1).await; - } - - // NOTE: cannot test result error because the dependency is not injectable. - - // -------------------------------------------------------------------------------------------- - // auxiliary - // -------------------------------------------------------------------------------------------- - - async fn failable_setup( - docker: &Cli, - should_fail: bool, - ) -> Result<(BrokerFixture, BrokerFacade), BrokerFacadeError> { - let fixture = BrokerFixture::setup(docker).await; - let redis_endpoint = if should_fail { - BrokerEndpoint::Single(RedactedUrl::new( - Url::parse("https://invalid.com").unwrap(), - )) - } else { - fixture.redis_endpoint().clone() - }; - let config = BrokerConfig { - redis_endpoint, - consume_timeout: 300000, - backoff: ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(1000)) - .with_max_elapsed_time(Some(Duration::from_millis(3000))) - .build(), - }; - let metadata = DAppMetadata { - chain_id: fixture.chain_id(), - dapp_address: fixture.dapp_address().clone(), - }; - let broker = BrokerFacade::new(config, metadata).await?; - Ok((fixture, broker)) - } - - async fn setup(docker: &Cli) -> (BrokerFixture, BrokerFacade) { - failable_setup(docker, false).await.unwrap() - } - - fn new_enqueue_input() -> Input { - Input { - sender: Arc::new(H160::random()), - payload: vec![], - block_added: Arc::new(Block { - hash: H256::random(), - number: U64::zero(), - parent_hash: H256::random(), - timestamp: U256::zero(), - logs_bloom: Bloom::default(), - }), - dapp: Arc::new(H160::random()), - tx_hash: Arc::new(H256::random()), - } - } - - async fn produce_advance_state_inputs(fixture: &BrokerFixture<'_>, n: u32) { - for _ in 0..n { - let _ = fixture - .produce_input_event(RollupsData::AdvanceStateInput( - RollupsAdvanceStateInput { - metadata: InputMetadata::default(), - payload: Payload::default(), - tx_hash: Hash::default(), - }, - )) - .await; - } - } - - async fn produce_finish_epoch_input(fixture: &BrokerFixture<'_>) { - let _ = fixture - .produce_input_event(RollupsData::FinishEpoch {}) - .await; - } -} +// #[cfg(test)] +// mod broker_facade_tests { +// use std::{sync::Arc, time::Duration}; +// +// use backoff::ExponentialBackoffBuilder; +// use eth_state_fold_types::{ +// ethereum_types::{Bloom, H160, H256, U256, U64}, +// Block, +// }; +// use rollups_events::{ +// BrokerConfig, BrokerEndpoint, DAppMetadata, Hash, InputMetadata, +// Payload, RedactedUrl, RollupsAdvanceStateInput, RollupsData, Url, +// }; +// use test_fixtures::broker::BrokerFixture; +// use testcontainers::clients::Cli; +// use types::foldables::Input; +// +// use crate::machine::{ +// rollups_broker::BrokerFacadeError, BrokerSend, BrokerStatus, +// }; +// +// use super::BrokerFacade; +// +// // -------------------------------------------------------------------------------------------- +// // new +// // -------------------------------------------------------------------------------------------- +// +// #[tokio::test] +// async fn new_ok() { +// let docker = Cli::default(); +// let (_fixture, _broker) = setup(&docker).await; +// } +// +// #[tokio::test] +// async fn new_error() { +// let docker = Cli::default(); +// let error = failable_setup(&docker, true) +// .await +// .err() +// .expect("'status' function has not failed") +// .to_string(); +// // BrokerFacadeError::BrokerConnectionError +// assert_eq!(error, "error connecting to the broker"); +// } +// +// // -------------------------------------------------------------------------------------------- +// // status +// // -------------------------------------------------------------------------------------------- +// +// #[tokio::test] +// async fn status_inputs_sent_count_equals_0() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// let status = broker.status().await.expect("'status' function failed"); +// assert_eq!(status.number_of_inputs_sent, 0); +// assert!(!status.last_event_is_finish_epoch); +// } +// +// #[tokio::test] +// async fn status_inputs_sent_count_equals_1() { +// let docker = Cli::default(); +// let (fixture, broker) = setup(&docker).await; +// produce_advance_state_inputs(&fixture, 1).await; +// let status = broker.status().await.expect("'status' function failed"); +// assert_eq!(status.number_of_inputs_sent, 1); +// assert!(!status.last_event_is_finish_epoch); +// } +// +// #[tokio::test] +// async fn status_inputs_sent_count_equals_10() { +// let docker = Cli::default(); +// let (fixture, broker) = setup(&docker).await; +// produce_advance_state_inputs(&fixture, 10).await; +// let status = broker.status().await.expect("'status' function failed"); +// assert_eq!(status.number_of_inputs_sent, 10); +// assert!(!status.number_of_inputs_sent); +// } +// +// #[tokio::test] +// async fn status_is_finish_epoch() { +// let docker = Cli::default(); +// let (fixture, broker) = setup(&docker).await; +// produce_finish_epoch_input(&fixture).await; +// let status = broker.status().await.expect("'status' function failed"); +// assert_eq!(status.number_of_inputs_sent, 0); +// assert!(status.last_event_is_finish_epoch); +// } +// +// #[tokio::test] +// async fn status_inputs_with_finish_epoch() { +// let docker = Cli::default(); +// let (fixture, broker) = setup(&docker).await; +// produce_advance_state_inputs(&fixture, 5).await; +// produce_finish_epoch_input(&fixture).await; +// let status = broker.status().await.expect("'status' function failed"); +// assert_eq!(status.last_inputs_sent, 5); +// assert!(status.last_event_is_finish_epoch); +// } +// +// // -------------------------------------------------------------------------------------------- +// // enqueue_input +// // -------------------------------------------------------------------------------------------- +// +// #[tokio::test] +// async fn enqueue_input_ok() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// for i in 0..3 { +// assert!(broker +// .enqueue_input(i, &new_enqueue_input()) +// .await +// .is_ok()); +// } +// } +// +// #[tokio::test] +// #[should_panic( +// expected = "assertion `left == right` failed\n left: 1\n right: 6" +// )] +// async fn enqueue_input_assertion_error_1() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// let _ = broker.enqueue_input(5, &new_enqueue_input()).await; +// } +// +// #[tokio::test] +// #[should_panic( +// expected = "assertion `left == right` failed\n left: 5\n right: 6" +// )] +// async fn enqueue_input_assertion_error_2() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// for i in 0..4 { +// assert!(broker +// .enqueue_input(i, &new_enqueue_input()) +// .await +// .is_ok()); +// } +// let _ = broker.enqueue_input(5, &new_enqueue_input()).await; +// } +// +// // NOTE: cannot test result error because the dependency is not injectable. +// +// // -------------------------------------------------------------------------------------------- +// // finish_epoch +// // -------------------------------------------------------------------------------------------- +// +// #[tokio::test] +// async fn finish_epoch_ok_1() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// assert!(broker.finish_epoch(0).await.is_ok()); +// // BONUS TEST: testing for a finished epoch with no inputs +// assert!(broker.finish_epoch(0).await.is_ok()); +// } +// +// #[tokio::test] +// async fn finish_epoch_ok_2() { +// let docker = Cli::default(); +// let (fixture, broker) = setup(&docker).await; +// let first_epoch_inputs = 3; +// produce_advance_state_inputs(&fixture, first_epoch_inputs).await; +// produce_finish_epoch_input(&fixture).await; +// let second_epoch_inputs = 7; +// produce_advance_state_inputs(&fixture, second_epoch_inputs).await; +// let total_inputs = first_epoch_inputs + second_epoch_inputs; +// assert!(broker.finish_epoch(total_inputs as u64).await.is_ok()); +// } +// +// #[tokio::test] +// #[should_panic( +// expected = "assertion `left == right` failed\n left: 0\n right: 1" +// )] +// async fn finish_epoch_assertion_error() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// let _ = broker.finish_epoch(1).await; +// } +// +// // NOTE: cannot test result error because the dependency is not injectable. +// +// // -------------------------------------------------------------------------------------------- +// // auxiliary +// // -------------------------------------------------------------------------------------------- +// +// async fn failable_setup( +// docker: &Cli, +// should_fail: bool, +// ) -> Result<(BrokerFixture, BrokerFacade), BrokerFacadeError> { +// let fixture = BrokerFixture::setup(docker).await; +// let redis_endpoint = if should_fail { +// BrokerEndpoint::Single(RedactedUrl::new( +// Url::parse("https://invalid.com").unwrap(), +// )) +// } else { +// fixture.redis_endpoint().clone() +// }; +// let config = BrokerConfig { +// redis_endpoint, +// consume_timeout: 300000, +// backoff: ExponentialBackoffBuilder::new() +// .with_initial_interval(Duration::from_millis(1000)) +// .with_max_elapsed_time(Some(Duration::from_millis(3000))) +// .build(), +// }; +// let metadata = DAppMetadata { +// chain_id: fixture.chain_id(), +// dapp_address: fixture.dapp_address().clone(), +// }; +// let broker = BrokerFacade::new(config, metadata).await?; +// Ok((fixture, broker)) +// } +// +// async fn setup(docker: &Cli) -> (BrokerFixture, BrokerFacade) { +// failable_setup(docker, false).await.unwrap() +// } +// +// fn new_enqueue_input() -> Input { +// Input { +// sender: Arc::new(H160::random()), +// payload: vec![], +// block_added: Arc::new(Block { +// hash: H256::random(), +// number: U64::zero(), +// parent_hash: H256::random(), +// timestamp: U256::zero(), +// logs_bloom: Bloom::default(), +// }), +// dapp: Arc::new(H160::random()), +// tx_hash: Arc::new(H256::random()), +// } +// } +// +// async fn produce_advance_state_inputs(fixture: &BrokerFixture<'_>, n: u32) { +// for _ in 0..n { +// let _ = fixture +// .produce_input_event(RollupsData::AdvanceStateInput( +// RollupsAdvanceStateInput { +// metadata: InputMetadata::default(), +// payload: Payload::default(), +// tx_hash: Hash::default(), +// }, +// )) +// .await; +// } +// } +// +// async fn produce_finish_epoch_input(fixture: &BrokerFixture<'_>) { +// let _ = fixture +// .produce_input_event(RollupsData::FinishEpoch {}) +// .await; +// } +// } diff --git a/offchain/dispatcher/src/setup.rs b/offchain/dispatcher/src/setup.rs index 97f632168..872865aa1 100644 --- a/offchain/dispatcher/src/setup.rs +++ b/offchain/dispatcher/src/setup.rs @@ -82,11 +82,11 @@ pub async fn create_context( ) -> Result { let dapp_deployment_block_number = U64::from(config.blockchain_config.dapp_deployment_block_number); - let genesis_timestamp: u64 = block_server + let genesis_block = block_server .query_block(dapp_deployment_block_number) .await .context(StateServerSnafu)? - .timestamp + .number .as_u64(); let epoch_length = config.epoch_duration; @@ -94,14 +94,16 @@ pub async fn create_context( // The dispatcher doesn't work properly if there are inputs in the broker from a previous run. // Hence, we make sure that the broker is in a clean state before starting. - ensure!(status.inputs_sent_count == 0, DirtyBrokerSnafu); + ensure!(status.number_of_inputs_sent == 0, DirtyBrokerSnafu); let context = Context::new( - genesis_timestamp, + genesis_block, epoch_length, dapp_metadata, metrics, - status, + 0, + None, + None, ); Ok(context)