diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c619d8ec..e2217cff9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,12 +6,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +<<<<<<< HEAD ## [Unreleased] ### Added - Added Rollups end-to-end test using Echo Dapp +### Changed + +- Changed the dispatcher to close epochs based on block numbers instead of block timestamps. +- Renamed `CARTESI_EPOCH_DURATION` to `CARTESI_EPOCH_LENGTH`, and set its default value to 5760 (1 day worth of blocks, in average). + ## [1.4.0] 2024-04-09 ### Added diff --git a/docs/config.md b/docs/config.md index ecb988b07..4820319af 100644 --- a/docs/config.md +++ b/docs/config.md @@ -236,14 +236,14 @@ for more information. * **Type:** `string` * **Default:** `""` -## `CARTESI_EPOCH_DURATION` +## `CARTESI_EPOCH_LENGTH` -Duration of a rollups epoch in seconds. +Length of a rollups epoch in blocks. At the end of each epoch, the node will send claims to the blockchain. -* **Type:** `Duration` -* **Default:** `"86400"` +* **Type:** `uint64` +* **Default:** `"5760"` ## `CARTESI_SNAPSHOT_DIR` diff --git a/internal/node/config/generate/Config.toml b/internal/node/config/generate/Config.toml index 175b2c754..54cb86149 100644 --- a/internal/node/config/generate/Config.toml +++ b/internal/node/config/generate/Config.toml @@ -45,11 +45,11 @@ the snapshot matches the hash in the Application contract.""" # Rollups # -[rollups.CARTESI_EPOCH_DURATION] -default = "86400" # 1 day in seconds -go-type = "Duration" +[rollups.CARTESI_EPOCH_LENGTH] +default = "5760" # 1 day (average) in blocks +go-type = "uint64" description = """ -Duration of a rollups epoch in seconds. +Length of a rollups epoch in blocks. At the end of each epoch, the node will send claims to the blockchain.""" diff --git a/internal/node/config/generated.go b/internal/node/config/generated.go index 3c9ba6138..b3007e9cc 100644 --- a/internal/node/config/generated.go +++ b/internal/node/config/generated.go @@ -473,14 +473,14 @@ func getPostgresEndpoint() string { return val } -func getEpochDuration() Duration { - s, ok := os.LookupEnv("CARTESI_EPOCH_DURATION") +func getEpochLength() uint64 { + s, ok := os.LookupEnv("CARTESI_EPOCH_LENGTH") if !ok { - s = "86400" + s = "5760" } - val, err := toDuration(s) + val, err := toUint64(s) if err != nil { - panic(fmt.Sprintf("failed to parse CARTESI_EPOCH_DURATION: %v", err)) + panic(fmt.Sprintf("failed to parse CARTESI_EPOCH_LENGTH: %v", err)) } return val } diff --git a/offchain/dispatcher/src/drivers/context.rs b/offchain/dispatcher/src/drivers/context.rs index 5c788d5ac..fca5587f0 100644 --- a/offchain/dispatcher/src/drivers/context.rs +++ b/offchain/dispatcher/src/drivers/context.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use crate::{ - machine::{rollups_broker::BrokerFacadeError, BrokerSend, RollupStatus}, + machine::{rollups_broker::BrokerFacadeError, BrokerSend}, metrics::DispatcherMetrics, }; @@ -11,48 +11,53 @@ use types::foldables::Input; #[derive(Debug)] pub struct Context { - inputs_sent_count: u64, - last_event_is_finish_epoch: bool, - last_timestamp: u64, + inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, // constants - genesis_timestamp: u64, + genesis_block: u64, epoch_length: u64, + // metrics dapp_metadata: DAppMetadata, metrics: DispatcherMetrics, } impl Context { pub fn new( - genesis_timestamp: u64, + genesis_block: u64, epoch_length: u64, dapp_metadata: DAppMetadata, metrics: DispatcherMetrics, - status: RollupStatus, + inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, ) -> Self { + assert!(epoch_length > 0); Self { - inputs_sent_count: status.inputs_sent_count, - last_event_is_finish_epoch: status.last_event_is_finish_epoch, - last_timestamp: genesis_timestamp, - genesis_timestamp, + inputs_sent, + last_input_epoch, + last_finished_epoch, + genesis_block, epoch_length, dapp_metadata, metrics, } } - pub fn inputs_sent_count(&self) -> u64 { - self.inputs_sent_count + pub fn inputs_sent(&self) -> u64 { + self.inputs_sent } pub async fn finish_epoch_if_needed( &mut self, - event_timestamp: u64, + block: u64, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - if self.should_finish_epoch(event_timestamp) { - self.finish_epoch(event_timestamp, broker).await?; + let epoch = self.calculate_epoch(block); + if self.should_finish_epoch(epoch) { + self.finish_epoch(broker).await?; } Ok(()) } @@ -62,98 +67,122 @@ impl Context { input: &Input, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - broker.enqueue_input(self.inputs_sent_count, input).await?; + let input_block = input.block_added.number.as_u64(); + self.finish_epoch_if_needed(input_block, broker).await?; + + broker.enqueue_input(self.inputs_sent, input).await?; + self.metrics .advance_inputs_sent .get_or_create(&self.dapp_metadata) .inc(); - self.inputs_sent_count += 1; - self.last_event_is_finish_epoch = false; + + self.inputs_sent += 1; + self.last_input_epoch = + Some(self.calculate_epoch(input.block_added.number.as_u64())); + Ok(()) } } impl Context { - fn calculate_epoch(&self, timestamp: u64) -> u64 { - assert!(timestamp >= self.genesis_timestamp); - (timestamp - self.genesis_timestamp) / self.epoch_length - } - - // This logic works because we call this function with `event_timestamp` being equal to the - // timestamp of each individual input, rather than just the latest from the blockchain. - fn should_finish_epoch(&self, event_timestamp: u64) -> bool { - if self.inputs_sent_count == 0 || self.last_event_is_finish_epoch { - false - } else { - let current_epoch = self.calculate_epoch(self.last_timestamp); - let event_epoch = self.calculate_epoch(event_timestamp); - event_epoch > current_epoch + fn calculate_epoch(&self, block: u64) -> u64 { + assert!(block >= self.genesis_block); + (block - self.genesis_block) / self.epoch_length + } + + fn should_finish_epoch(&self, epoch: u64) -> bool { + match (self.last_input_epoch, self.last_finished_epoch) { + (Some(input), Some(finished)) => assert!(input >= finished), + _ => (), + } + + if self.last_finished_epoch == self.last_input_epoch { + return false; // if the current epoch is empty } + + if epoch == self.last_input_epoch.unwrap() { + return false; // if the current epoch is still not over + } + + epoch > self.last_finished_epoch.unwrap_or(0) } async fn finish_epoch( &mut self, - event_timestamp: u64, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - assert!(event_timestamp >= self.genesis_timestamp); - broker.finish_epoch(self.inputs_sent_count).await?; + broker.finish_epoch(self.inputs_sent).await?; self.metrics .finish_epochs_sent .get_or_create(&self.dapp_metadata) .inc(); - self.last_timestamp = event_timestamp; - self.last_event_is_finish_epoch = true; + + self.last_finished_epoch = self.last_input_epoch; Ok(()) } } #[cfg(test)] -mod private_tests { - use crate::{drivers::mock, metrics::DispatcherMetrics}; +mod tests { + use std::collections::VecDeque; - use super::{Context, DAppMetadata}; + use crate::drivers::mock::Event; + use rollups_events::DAppMetadata; + use serial_test::serial; - // -------------------------------------------------------------------------------------------- - // calculate_epoch_for - // -------------------------------------------------------------------------------------------- + use crate::{drivers::mock, metrics::DispatcherMetrics}; - fn new_context_for_calculate_epoch_test( - genesis_timestamp: u64, - epoch_length: u64, - ) -> Context { - Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 0, - genesis_timestamp, - epoch_length, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), + use super::Context; + + impl Default for Context { + fn default() -> Self { + Context::new( + /* genesis_block */ 0, + /* epoch_length */ 10, + /* dapp_metadata */ DAppMetadata::default(), + /* metrics */ DispatcherMetrics::default(), + /* number_of_inputs_sent */ 0, + /* last_input_epoch */ None, + /* last_finished_epoch */ None, + ) } } + // -------------------------------------------------------------------------------------------- + // calculate_epoch + // -------------------------------------------------------------------------------------------- + #[test] fn calculate_epoch_with_zero_genesis() { - let epoch_length = 3; - let context = new_context_for_calculate_epoch_test(0, epoch_length); - let n = 10; + let mut context = Context::default(); + context.genesis_block = 0; + context.epoch_length = 10; + + let number_of_epochs = 10; let mut tested = 0; - for epoch in 0..n { - let x = epoch * epoch_length; - let y = (epoch + 1) * epoch_length; - for i in x..y { - assert_eq!(context.calculate_epoch(i), epoch); + for current_epoch in 0..number_of_epochs { + let block_lower_bound = current_epoch * context.epoch_length; + let block_upper_bound = (current_epoch + 1) * context.epoch_length; + for i in block_lower_bound..block_upper_bound { + assert_eq!(context.calculate_epoch(i), current_epoch); tested += 1; } } - assert_eq!(tested, n * epoch_length); - assert_eq!(context.calculate_epoch(9), 3); + + assert_eq!(tested, number_of_epochs * context.epoch_length); + assert_eq!( + context.calculate_epoch(context.epoch_length * number_of_epochs), + context.epoch_length + ); } #[test] fn calculate_epoch_with_offset_genesis() { - let context = new_context_for_calculate_epoch_test(2, 2); + let mut context = Context::default(); + context.genesis_block = 2; + context.epoch_length = 2; + assert_eq!(context.calculate_epoch(2), 0); assert_eq!(context.calculate_epoch(3), 0); assert_eq!(context.calculate_epoch(4), 1); @@ -163,68 +192,119 @@ mod private_tests { #[test] #[should_panic] - fn calculate_epoch_invalid() { - new_context_for_calculate_epoch_test(4, 3).calculate_epoch(2); + fn calculate_epoch_should_panic_because_block_came_before_genesis() { + let mut context = Context::default(); + context.genesis_block = 4; + context.epoch_length = 4; + context.calculate_epoch(2); + } + + // -------------------------------------------------------------------------------------------- + // should_finish_epoch -- first epoch + // -------------------------------------------------------------------------------------------- + + #[test] + fn should_finish_the_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(10); + assert_eq!(context.should_finish_epoch(epoch), true); + } + + #[test] + fn should_finish_the_first_epoch_after_several_blocks() { + let mut context = Context::default(); + context.inputs_sent = 110; + context.last_input_epoch = Some(9); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(100); + assert_eq!(context.should_finish_epoch(epoch), true); + } + + #[test] + fn should_not_finish_an_empty_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 0; + context.last_input_epoch = None; + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(10); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_very_late_empty_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 0; + context.last_input_epoch = None; + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(2340); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_timely_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(9); + assert_eq!(context.should_finish_epoch(epoch), false); } // -------------------------------------------------------------------------------------------- - // should_finish_epoch + // should_finish_epoch -- other epochs // -------------------------------------------------------------------------------------------- #[test] - fn should_not_finish_epoch_because_of_time() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(4)); + fn should_finish_epoch() { + let mut context = Context::default(); + context.inputs_sent = 42; + context.last_input_epoch = Some(4); + context.last_finished_epoch = Some(3); + let epoch = context.calculate_epoch(54); + assert_eq!(context.should_finish_epoch(epoch), true); } #[test] - fn should_not_finish_epoch_because_of_zero_inputs() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(4)); + fn should_finish_epoch_by_a_lot() { + let mut context = Context::default(); + context.inputs_sent = 142; + context.last_input_epoch = Some(15); + context.last_finished_epoch = Some(2); + let epoch = context.calculate_epoch(190); + assert_eq!(context.should_finish_epoch(epoch), true); } #[test] - fn should_finish_epoch_because_of_time() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(context.should_finish_epoch(5)); + fn should_not_finish_an_empty_epoch() { + let mut context = Context::default(); + context.inputs_sent = 120; + context.last_input_epoch = Some(9); + context.last_finished_epoch = Some(9); + let epoch = context.calculate_epoch(105); + assert_eq!(context.should_finish_epoch(epoch), false); } #[test] - fn should_finish_epoch_because_last_event_is_finish_epoch() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: true, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(5)); + fn should_not_finish_a_very_late_empty_epoch() { + let mut context = Context::default(); + context.inputs_sent = 120; + context.last_input_epoch = Some(15); + context.last_finished_epoch = Some(15); + let epoch = context.calculate_epoch(1000); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_timely_epoch() { + let mut context = Context::default(); + context.inputs_sent = 230; + context.last_input_epoch = Some(11); + context.last_finished_epoch = Some(10); + let epoch = context.calculate_epoch(110); + assert_eq!(context.should_finish_epoch(epoch), false); } // -------------------------------------------------------------------------------------------- @@ -233,72 +313,29 @@ mod private_tests { #[tokio::test] async fn finish_epoch_ok() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - let broker = mock::Broker::new(vec![], vec![]); - let timestamp = 6; - let result = context.finish_epoch(timestamp, &broker).await; - assert!(result.is_ok()); - assert_eq!(context.last_timestamp, timestamp); - assert!(context.last_event_is_finish_epoch); - } + let mut context = Context::default(); + context.inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; - #[tokio::test] - #[should_panic] - async fn finish_epoch_invalid() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 6, - genesis_timestamp: 5, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; let broker = mock::Broker::new(vec![], vec![]); - let _ = context.finish_epoch(0, &broker).await; + let result = context.finish_epoch(&broker).await; + assert!(result.is_ok()); + assert_eq!(context.inputs_sent, 1); + assert_eq!(context.last_input_epoch, Some(0)); + assert_eq!(context.last_finished_epoch, Some(0)); } #[tokio::test] async fn finish_epoch_broker_error() { - let last_timestamp = 3; - let last_event_is_finish_epoch = false; - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch, - last_timestamp, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); let broker = mock::Broker::with_finish_epoch_error(); - let result = context.finish_epoch(6, &broker).await; + let result = context.finish_epoch(&broker).await; assert!(result.is_err()); - assert_eq!(context.last_timestamp, last_timestamp); - assert_eq!( - context.last_event_is_finish_epoch, - last_event_is_finish_epoch - ); + assert_eq!(context.inputs_sent, 0); + assert_eq!(context.last_input_epoch, None); + assert_eq!(context.last_finished_epoch, None); } -} - -#[cfg(test)] -mod public_tests { - use crate::{ - drivers::mock::{self, SendInteraction}, - machine::RollupStatus, - metrics::DispatcherMetrics, - }; - - use super::{Context, DAppMetadata}; // -------------------------------------------------------------------------------------------- // new @@ -306,26 +343,41 @@ mod public_tests { #[tokio::test] async fn new_ok() { - let genesis_timestamp = 42; + let genesis_block = 42; let epoch_length = 24; - let inputs_sent_count = 150; - let last_event_is_finish_epoch = true; - let rollup_status = RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch, - }; + let number_of_inputs_sent = 150; + let last_input_epoch = Some(14); + let last_finished_epoch = Some(37); + let context = Context::new( - genesis_timestamp, + genesis_block, epoch_length, DAppMetadata::default(), DispatcherMetrics::default(), - rollup_status, + number_of_inputs_sent, + last_input_epoch, + last_finished_epoch, ); - assert_eq!(context.genesis_timestamp, genesis_timestamp); - assert_eq!(context.inputs_sent_count, inputs_sent_count); - assert_eq!( - context.last_event_is_finish_epoch, - last_event_is_finish_epoch + + assert_eq!(context.genesis_block, genesis_block); + assert_eq!(context.epoch_length, epoch_length); + assert_eq!(context.dapp_metadata, DAppMetadata::default()); + assert_eq!(context.inputs_sent, number_of_inputs_sent); + assert_eq!(context.last_input_epoch, last_input_epoch); + assert_eq!(context.last_finished_epoch, last_finished_epoch); + } + + #[test] + #[should_panic] + fn new_should_panic_because_epoch_length_is_zero() { + Context::new( + 0, + 0, + DAppMetadata::default(), + DispatcherMetrics::default(), + 0, + None, + None, ); } @@ -335,17 +387,10 @@ mod public_tests { #[test] fn inputs_sent_count() { - let inputs_sent_count = 42; - let context = Context { - inputs_sent_count, - last_event_is_finish_epoch: false, // ignored - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert_eq!(context.inputs_sent_count(), inputs_sent_count); + let number_of_inputs_sent = 42; + let mut context = Context::default(); + context.inputs_sent = number_of_inputs_sent; + assert_eq!(context.inputs_sent(), number_of_inputs_sent); } // -------------------------------------------------------------------------------------------- @@ -354,52 +399,40 @@ mod public_tests { #[tokio::test] async fn finish_epoch_if_needed_true() { - let mut context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 4, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let broker = mock::Broker::new(vec![], vec![]); - let result = context.finish_epoch_if_needed(4, &broker).await; + let result = context.finish_epoch_if_needed(12, &broker).await; assert!(result.is_ok()); - broker - .assert_send_interactions(vec![SendInteraction::FinishedEpoch(1)]); + broker.assert_state(vec![ + Event::Finish, // + ]); } #[tokio::test] async fn finish_epoch_if_needed_false() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 2, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let broker = mock::Broker::new(vec![], vec![]); - let result = context.finish_epoch_if_needed(3, &broker).await; + let result = context.finish_epoch_if_needed(9, &broker).await; assert!(result.is_ok()); - broker.assert_send_interactions(vec![]); + broker.assert_state(vec![]); } #[tokio::test] async fn finish_epoch_if_needed_broker_error() { - let mut context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 4, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; let broker = mock::Broker::with_finish_epoch_error(); - let result = context.finish_epoch_if_needed(4, &broker).await; + let result = context.finish_epoch_if_needed(28, &broker).await; assert!(result.is_err()); } @@ -409,40 +442,215 @@ mod public_tests { #[tokio::test] async fn enqueue_input_ok() { - let inputs_sent_count = 42; - let mut context = Context { - inputs_sent_count, - last_event_is_finish_epoch: true, - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - let input = mock::new_input(2); + let number_of_inputs_sent = 42; + let last_input_epoch = Some(1); + let last_finished_epoch = None; + + let mut context = Context::default(); + context.inputs_sent = number_of_inputs_sent; + context.last_input_epoch = last_input_epoch; + context.last_finished_epoch = last_finished_epoch; + + let input = mock::new_input(22); let broker = mock::Broker::new(vec![], vec![]); let result = context.enqueue_input(&input, &broker).await; assert!(result.is_ok()); - assert_eq!(context.inputs_sent_count, inputs_sent_count + 1); - assert!(!context.last_event_is_finish_epoch); - broker.assert_send_interactions(vec![SendInteraction::EnqueuedInput( - inputs_sent_count, - )]); + + assert_eq!(context.inputs_sent, number_of_inputs_sent + 1); + assert_eq!(context.last_input_epoch, Some(2)); + assert_eq!(context.last_finished_epoch, Some(1)); + + broker.assert_state(vec![ + Event::Finish, + Event::Input(number_of_inputs_sent), + ]); } #[tokio::test] async fn enqueue_input_broker_error() { - let mut context = Context { - inputs_sent_count: 42, - last_event_is_finish_epoch: true, - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); let broker = mock::Broker::with_enqueue_input_error(); - let result = context.enqueue_input(&mock::new_input(2), &broker).await; + let result = context.enqueue_input(&mock::new_input(82), &broker).await; assert!(result.is_err()); } + + // -------------------------------------------------------------------------------------------- + // deterministic behavior + // -------------------------------------------------------------------------------------------- + + #[derive(Clone)] + struct Case { + input_blocks: Vec, + epoch_length: u64, + last_block: u64, + expected: Vec, + } + + #[tokio::test] + #[serial] + async fn deterministic_behavior() { + let cases: Vec = vec![ + Case { + input_blocks: vec![], + epoch_length: 2, + last_block: 100, + expected: vec![], + }, + Case { + input_blocks: vec![0, 1, 4, 5], + epoch_length: 2, + last_block: 10, + expected: vec![ + Event::Input(0), + Event::Input(1), + Event::Finish, + Event::Input(2), + Event::Input(3), + Event::Finish, + ], + }, + Case { + input_blocks: vec![0, 2], + epoch_length: 2, + last_block: 4, + expected: vec![ + Event::Input(0), + Event::Finish, + Event::Input(1), + Event::Finish, + ], + }, + Case { + input_blocks: vec![1, 2, 4], + epoch_length: 2, + last_block: 6, + expected: vec![ + Event::Input(0), + Event::Finish, + Event::Input(1), + Event::Finish, + Event::Input(2), + Event::Finish, + ], + }, + Case { + input_blocks: vec![0, 1, 2, 3, 4, 5, 6, 7], + epoch_length: 2, + last_block: 7, + expected: vec![ + Event::Input(0), + Event::Input(1), + Event::Finish, + Event::Input(2), + Event::Input(3), + Event::Finish, + Event::Input(4), + Event::Input(5), + Event::Finish, + Event::Input(6), + Event::Input(7), + ], + }, + Case { + input_blocks: vec![0, 5, 9], + epoch_length: 2, + last_block: 10, + expected: vec![ + Event::Input(0), + Event::Finish, + Event::Input(1), + Event::Finish, + Event::Input(2), + Event::Finish, + ], + }, + ]; + for (i, case) in cases.iter().enumerate() { + println!("Testing case {}.", i); + test_deterministic_case(case.clone()).await; + } + } + + // -------------------------------------------------------------------------------------------- + // auxiliary + // -------------------------------------------------------------------------------------------- + + async fn test_deterministic_case(case: Case) { + let broker1 = create_state_as_inputs_are_being_received( + case.epoch_length, + case.input_blocks.clone(), + case.last_block, + ) + .await; + let broker2 = create_state_by_receiving_all_inputs_at_once( + case.epoch_length, + case.input_blocks.clone(), + case.last_block, + ) + .await; + broker1.assert_state(case.expected.clone()); + broker2.assert_state(case.expected.clone()); + } + + async fn create_state_as_inputs_are_being_received( + epoch_length: u64, + input_blocks: Vec, + last_block: u64, + ) -> mock::Broker { + println!("================================================"); + println!("one_block_at_a_time:"); + + let mut input_blocks: VecDeque<_> = input_blocks.into(); + let mut current_input_block = input_blocks.pop_front(); + + let mut context = Context::default(); + context.epoch_length = epoch_length; + let broker = mock::Broker::new(vec![], vec![]); + + for block in 0..=last_block { + if let Some(input_block) = current_input_block { + if block == input_block { + println!("\tenqueue_input(input_block: {})", block); + let input = mock::new_input(block); + let result = context.enqueue_input(&input, &broker).await; + assert!(result.is_ok()); + + current_input_block = input_blocks.pop_front(); + } + } + + println!("\tfinish_epoch_if_needed(block: {})\n", block); + let result = context.finish_epoch_if_needed(block, &broker).await; + assert!(result.is_ok()); + } + + broker + } + + async fn create_state_by_receiving_all_inputs_at_once( + epoch_length: u64, + input_blocks: Vec, + last_block: u64, + ) -> mock::Broker { + println!("all_inputs_at_once:"); + + let mut context = Context::default(); + context.epoch_length = epoch_length; + let broker = mock::Broker::new(vec![], vec![]); + + for block in input_blocks { + println!("\tenqueue_input(input_block: {})\n", block); + let input = mock::new_input(block); + let result = context.enqueue_input(&input, &broker).await; + assert!(result.is_ok()); + } + + println!("\tfinish_epoch_if_needed(last_block: {})", last_block); + let result = context.finish_epoch_if_needed(last_block, &broker).await; + assert!(result.is_ok()); + + println!("================================================"); + + broker + } } diff --git a/offchain/dispatcher/src/drivers/machine.rs b/offchain/dispatcher/src/drivers/machine.rs index 3f22f97f6..6519cd258 100644 --- a/offchain/dispatcher/src/drivers/machine.rs +++ b/offchain/dispatcher/src/drivers/machine.rs @@ -6,9 +6,9 @@ use super::Context; use crate::machine::{rollups_broker::BrokerFacadeError, BrokerSend}; use eth_state_fold_types::{ethereum_types::Address, Block}; -use types::foldables::{DAppInputBox, Input, InputBox}; +use types::foldables::{DAppInputBox, InputBox}; -use tracing::{debug, instrument, trace}; +use tracing::{debug, instrument}; pub struct MachineDriver { dapp_address: Address, @@ -27,21 +27,17 @@ impl MachineDriver { input_box: &InputBox, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - let dapp_input_box = - match input_box.dapp_input_boxes.get(&self.dapp_address) { - None => { - debug!("No inputs for dapp {}", self.dapp_address); - return Ok(()); - } - - Some(d) => d, - }; - - self.process_inputs(context, dapp_input_box, broker).await?; + match input_box.dapp_input_boxes.get(&self.dapp_address) { + None => { + debug!("No inputs for dapp {}", self.dapp_address); + } + Some(dapp_input_box) => { + self.process_inputs(context, dapp_input_box, broker).await? + } + }; - context - .finish_epoch_if_needed(block.timestamp.as_u64(), broker) - .await?; + let block = block.number.as_u64(); + context.finish_epoch_if_needed(block, broker).await?; Ok(()) } @@ -57,163 +53,63 @@ impl MachineDriver { ) -> Result<(), BrokerFacadeError> { tracing::trace!( "Last input sent to machine manager `{}`, current input `{}`", - context.inputs_sent_count(), + context.inputs_sent(), dapp_input_box.inputs.len() ); - let input_slice = dapp_input_box - .inputs - .skip(context.inputs_sent_count() as usize); + let input_slice = + dapp_input_box.inputs.skip(context.inputs_sent() as usize); for input in input_slice { - self.process_input(context, &input, broker).await?; + context.enqueue_input(&input, broker).await?; } Ok(()) } - - #[instrument(level = "trace", skip_all)] - async fn process_input( - &self, - context: &mut Context, - input: &Input, - broker: &impl BrokerSend, - ) -> Result<(), BrokerFacadeError> { - let input_timestamp = input.block_added.timestamp.as_u64(); - trace!(?context, ?input_timestamp); - - context - .finish_epoch_if_needed(input_timestamp, broker) - .await?; - - context.enqueue_input(input, broker).await?; - - Ok(()) - } } #[cfg(test)] mod tests { - use eth_state_fold_types::{ethereum_types::H160, Block}; - use rollups_events::DAppMetadata; use std::sync::Arc; + use eth_state_fold_types::ethereum_types::H160; + use rollups_events::DAppMetadata; + use types::foldables::InputBox; + use crate::{ drivers::{ - mock::{self, SendInteraction}, + machine::MachineDriver, + mock::{self, Broker}, Context, }, machine::RollupStatus, metrics::DispatcherMetrics, }; - use super::MachineDriver; - - // -------------------------------------------------------------------------------------------- - // process_input - // -------------------------------------------------------------------------------------------- - - async fn test_process_input( - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, - ) { - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, + fn new_context1( + genesis_block: u64, + epoch_length: u64, + inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, + ) -> Context { + Context::new( + genesis_block, + epoch_length, DAppMetadata::default(), DispatcherMetrics::default(), - rollup_status, - ); - let machine_driver = MachineDriver::new(H160::random()); - for block_timestamp in input_timestamps { - let input = mock::new_input(block_timestamp); - let result = machine_driver - .process_input(&mut context, &input, &broker) - .await; - assert!(result.is_ok()); - } - - broker.assert_send_interactions(expected); + inputs_sent, + last_input_epoch, + last_finished_epoch, + ) } - #[tokio::test] - async fn process_input_right_before_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![4]; - let send_interactions = vec![SendInteraction::EnqueuedInput(0)]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_at_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![5]; - let send_interactions = vec![ - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_last_event_is_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: true, - }; - let input_timestamps = vec![5]; - let send_interactions = vec![SendInteraction::EnqueuedInput(0)]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_after_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 3, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![6, 7]; - let send_interactions = vec![ - SendInteraction::FinishedEpoch(3), - SendInteraction::EnqueuedInput(3), - SendInteraction::EnqueuedInput(4), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_crossing_two_epochs() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![3, 4, 5, 6, 7, 9, 10, 11]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::FinishedEpoch(2), - SendInteraction::EnqueuedInput(2), - SendInteraction::EnqueuedInput(3), - SendInteraction::EnqueuedInput(4), - SendInteraction::EnqueuedInput(5), - SendInteraction::FinishedEpoch(6), - SendInteraction::EnqueuedInput(6), - SendInteraction::EnqueuedInput(7), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; + fn new_context2( + inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, + ) -> Context { + new_context1(0, 10, inputs_sent, last_input_epoch, last_finished_epoch) } // -------------------------------------------------------------------------------------------- @@ -221,23 +117,19 @@ mod tests { // -------------------------------------------------------------------------------------------- async fn test_process_inputs( - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, + mut context: Context, + input_blocks: Vec, + expected: Vec, ) { + let rollup_status = RollupStatus { + inputs_sent_count: context.inputs_sent(), + }; let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); let machine_driver = MachineDriver::new(H160::random()); let dapp_input_box = types::foldables::DAppInputBox { - inputs: input_timestamps + inputs: input_blocks .iter() - .map(|timestamp| Arc::new(mock::new_input(*timestamp))) + .map(|block| Arc::new(mock::new_input(*block))) .collect::>() .into(), }; @@ -246,48 +138,36 @@ mod tests { .await; assert!(result.is_ok()); - broker.assert_send_interactions(expected); + broker.assert_state(expected); } #[tokio::test] - async fn test_process_inputs_without_skipping() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::EnqueuedInput(2), - SendInteraction::EnqueuedInput(3), + async fn test_process_inputs_without_skipping_inputs() { + let context = new_context2(0, None, None); + let input_blocks = vec![0, 1, 2, 3]; + let expected = vec![ + mock::Event::Input(0), + mock::Event::Input(1), + mock::Event::Input(2), + mock::Event::Input(3), ]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; + test_process_inputs(context, input_blocks, expected).await; } #[tokio::test] - async fn process_inputs_with_some_skipping() { - let rollup_status = RollupStatus { - inputs_sent_count: 3, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![SendInteraction::EnqueuedInput(3)]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; + async fn process_inputs_with_some_skipped_inputs() { + let context = new_context2(2, Some(0), None); + let input_blocks = vec![0, 1, 2, 3]; + let expected = vec![mock::Event::Input(2), mock::Event::Input(3)]; + test_process_inputs(context, input_blocks, expected).await; } #[tokio::test] - async fn process_inputs_skipping_all() { - let rollup_status = RollupStatus { - inputs_sent_count: 4, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; + async fn process_inputs_skipping_all_inputs() { + let context = new_context2(4, Some(0), None); + let input_blocks = vec![0, 1, 2, 3]; + let expected = vec![]; + test_process_inputs(context, input_blocks, expected).await; } // -------------------------------------------------------------------------------------------- @@ -295,123 +175,236 @@ mod tests { // -------------------------------------------------------------------------------------------- async fn test_react( - block: Block, - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, - ) { - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); - + block: u64, + mut context: Context, + broker: Option, + input_box: Option, + input_blocks: Vec, + expected: Vec, + ) -> (Context, Broker, InputBox) { + let rollup_status = RollupStatus { + inputs_sent_count: context.inputs_sent(), + }; + let broker = broker + .unwrap_or(mock::Broker::new(vec![rollup_status], Vec::new())); let dapp_address = H160::random(); let machine_driver = MachineDriver::new(dapp_address); - let input_box = mock::new_input_box(); + let input_box = input_box.unwrap_or(mock::new_input_box()); let input_box = - mock::update_input_box(input_box, dapp_address, input_timestamps); + mock::update_input_box(input_box, dapp_address, input_blocks); let result = machine_driver - .react(&mut context, &block, &input_box, &broker) + .react(&mut context, &mock::new_block(block), &input_box, &broker) .await; assert!(result.is_ok()); - broker.assert_send_interactions(expected); + broker.assert_state(expected); + + (context, broker, input_box) } #[tokio::test] async fn react_without_finish_epoch() { - let block = mock::new_block(3); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; + let block = 3; + let context = new_context2(0, None, None); + let input_blocks = vec![1, 2]; + let expected = vec![mock::Event::Input(0), mock::Event::Input(1)]; + test_react(block, context, None, None, input_blocks, expected).await; } #[tokio::test] async fn react_with_finish_epoch() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::FinishedEpoch(2), + let block = 10; + let context = new_context2(0, None, None); + let input_blocks = vec![1, 2]; + let expected = vec![ + mock::Event::Input(0), + mock::Event::Input(1), + mock::Event::Finish, ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; + test_react(block, context, None, None, input_blocks, expected).await; } #[tokio::test] async fn react_with_internal_finish_epoch() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![4, 5]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), + let block = 14; + let context = new_context2(0, None, None); + let input_blocks = vec![9, 10]; + let expected = vec![ + mock::Event::Input(0), + mock::Event::Finish, + mock::Event::Input(1), ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; + test_react(block, context, None, None, input_blocks, expected).await; } #[tokio::test] async fn react_without_inputs() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); - let block = mock::new_block(5); - let input_box = mock::new_input_box(); - let machine_driver = MachineDriver::new(H160::random()); - let result = machine_driver - .react(&mut context, &block, &input_box, &broker) - .await; - assert!(result.is_ok()); - broker.assert_send_interactions(vec![]); + let block = 10; + let context = new_context2(0, None, None); + let input_blocks = vec![]; + let expected = vec![]; + test_react(block, context, None, None, input_blocks, expected).await; } + // NOTE: this test shows we DON'T close the epoch after the first input! #[tokio::test] async fn react_with_inputs_after_first_epoch_length() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![7, 8]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), + let block = 20; + let context = new_context2(0, None, None); + let input_blocks = vec![14, 16, 18, 20]; + let expected = vec![ + mock::Event::Input(0), + mock::Event::Input(1), + mock::Event::Input(2), + mock::Event::Finish, + mock::Event::Input(3), + ]; + test_react(block, context, None, None, input_blocks, expected).await; + } + + #[tokio::test] + async fn react_is_deterministic() { + let final_expected = vec![ + mock::Event::Input(0), + mock::Event::Finish, + mock::Event::Input(1), + mock::Event::Input(2), + mock::Event::Input(3), + mock::Event::Input(4), + mock::Event::Input(5), + mock::Event::Input(6), + mock::Event::Input(7), + mock::Event::Input(8), + mock::Event::Input(9), + mock::Event::Finish, + mock::Event::Input(10), + mock::Event::Input(11), + mock::Event::Input(12), + mock::Event::Input(13), + mock::Event::Input(14), + mock::Event::Input(15), + mock::Event::Finish, + mock::Event::Input(16), + mock::Event::Input(17), + mock::Event::Input(18), ]; - test_react(block, rollup_status, input_timestamps, send_interactions) + + { + // original + let block1 = 3100; + let block2 = 6944; + + let context = new_context1(0, 1000, 0, None, None); + + let input_blocks1 = vec![ + 56, // + // + 1078, // + 1091, // + 1159, // + 1204, // + 1227, // + 1280, // + 1298, // + 1442, // + 1637, // + // + 2827, // + 2881, // + 2883, // + 2887, // + 2891, // + 2934, // + ]; + let mut input_blocks2 = input_blocks1.clone(); + input_blocks2.append(&mut vec![ + 6160, // + 6864, // + 6944, // + ]); + + let expected1 = vec![ + mock::Event::Input(0), + mock::Event::Finish, + mock::Event::Input(1), + mock::Event::Input(2), + mock::Event::Input(3), + mock::Event::Input(4), + mock::Event::Input(5), + mock::Event::Input(6), + mock::Event::Input(7), + mock::Event::Input(8), + mock::Event::Input(9), + mock::Event::Finish, + mock::Event::Input(10), + mock::Event::Input(11), + mock::Event::Input(12), + mock::Event::Input(13), + mock::Event::Input(14), + mock::Event::Input(15), + mock::Event::Finish, + ]; + + let (context, broker, input_box) = test_react( + block1, + context, + None, + None, + input_blocks1, + expected1, + ) + .await; + + test_react( + block2, + context, + Some(broker), + Some(input_box), + input_blocks2, + final_expected.clone(), + ) .await; + } + + { + // reconstruction + let block = 6944; + let context = new_context1(0, 1000, 0, None, None); + let input_blocks = vec![ + 56, // + // + 1078, // + 1091, // + 1159, // + 1204, // + 1227, // + 1280, // + 1298, // + 1442, // + 1637, // + // + 2827, // + 2881, // + 2883, // + 2887, // + 2891, // + 2934, // + // + 6160, // + 6864, // + 6944, // + ]; + test_react( + block, + context, + None, + None, + input_blocks, + final_expected, + ) + .await; + } } } diff --git a/offchain/dispatcher/src/drivers/mock.rs b/offchain/dispatcher/src/drivers/mock.rs index d56acd616..1c39a6a40 100644 --- a/offchain/dispatcher/src/drivers/mock.rs +++ b/offchain/dispatcher/src/drivers/mock.rs @@ -24,21 +24,21 @@ use crate::machine::{ // auxiliary functions // ------------------------------------------------------------------------------------------------ -pub fn new_block(timestamp: u32) -> Block { +pub fn new_block(number: u64) -> Block { Block { hash: H256::random(), - number: 0.into(), + number: number.into(), parent_hash: H256::random(), - timestamp: timestamp.into(), + timestamp: 0.into(), logs_bloom: Bloom::default(), } } -pub fn new_input(timestamp: u32) -> Input { +pub fn new_input(block: u64) -> Input { Input { sender: Arc::new(H160::random()), payload: vec![], - block_added: Arc::new(new_block(timestamp)), + block_added: Arc::new(new_block(block)), dapp: Arc::new(H160::random()), tx_hash: Arc::new(H256::default()), } @@ -55,11 +55,11 @@ pub fn new_input_box() -> InputBox { pub fn update_input_box( input_box: InputBox, dapp_address: Address, - timestamps: Vec, + blocks: Vec, ) -> InputBox { - let inputs = timestamps + let inputs = blocks .iter() - .map(|timestamp| Arc::new(new_input(*timestamp))) + .map(|block| Arc::new(new_input(*block))) .collect::>(); let inputs = Vector::from(inputs); let dapp_input_boxes = input_box @@ -77,16 +77,16 @@ pub fn update_input_box( // ------------------------------------------------------------------------------------------------ #[derive(Debug, Clone, Copy, PartialEq)] -pub enum SendInteraction { - EnqueuedInput(u64), - FinishedEpoch(u64), +pub enum Event { + Input(u64), // input index + Finish, } #[derive(Debug)] pub struct Broker { pub rollup_statuses: Mutex>, pub next_claims: Mutex>, - pub send_interactions: Mutex>, + pub events: Mutex>, status_error: bool, enqueue_input_error: bool, finish_epoch_error: bool, @@ -97,7 +97,7 @@ impl Broker { Self { rollup_statuses: Mutex::new(VecDeque::new()), next_claims: Mutex::new(VecDeque::new()), - send_interactions: Mutex::new(Vec::new()), + events: Mutex::new(Vec::new()), status_error: false, enqueue_input_error: false, finish_epoch_error: false, @@ -126,28 +126,29 @@ impl Broker { broker } - fn send_interactions_len(&self) -> usize { - let mutex_guard = self.send_interactions.lock().unwrap(); + fn events_len(&self) -> usize { + let mutex_guard = self.events.lock().unwrap(); mutex_guard.deref().len() } - fn get_send_interaction(&self, i: usize) -> SendInteraction { - let mutex_guard = self.send_interactions.lock().unwrap(); + fn get_event(&self, i: usize) -> Event { + let mutex_guard = self.events.lock().unwrap(); mutex_guard.deref().get(i).unwrap().clone() } - pub fn assert_send_interactions(&self, expected: Vec) { + pub fn assert_state(&self, expected: Vec) { assert_eq!( - self.send_interactions_len(), + self.events_len(), expected.len(), - "{:?}", - self.send_interactions + "\n{:?}\n{:?}", + self.events.lock().unwrap().deref(), + expected ); - println!("Send interactions:"); + println!("Events:"); for (i, expected) in expected.iter().enumerate() { - let send_interaction = self.get_send_interaction(i); - println!("{:?} - {:?}", send_interaction, expected); - assert_eq!(send_interaction, *expected); + let event = self.get_event(i); + println!("index: {:?} => {:?} - {:?}", i, event, expected); + assert_eq!(event, *expected); } } } @@ -174,25 +175,18 @@ impl BrokerSend for Broker { if self.enqueue_input_error { whatever!("enqueue_input error") } else { - let mut mutex_guard = self.send_interactions.lock().unwrap(); - mutex_guard - .deref_mut() - .push(SendInteraction::EnqueuedInput(input_index)); + let mut mutex_guard = self.events.lock().unwrap(); + mutex_guard.deref_mut().push(Event::Input(input_index)); Ok(()) } } - async fn finish_epoch( - &self, - inputs_sent_count: u64, - ) -> Result<(), BrokerFacadeError> { + async fn finish_epoch(&self, _: u64) -> Result<(), BrokerFacadeError> { if self.finish_epoch_error { whatever!("finish_epoch error") } else { - let mut mutex_guard = self.send_interactions.lock().unwrap(); - mutex_guard - .deref_mut() - .push(SendInteraction::FinishedEpoch(inputs_sent_count)); + let mut mutex_guard = self.events.lock().unwrap(); + mutex_guard.deref_mut().push(Event::Finish); Ok(()) } } diff --git a/offchain/dispatcher/src/machine/mod.rs b/offchain/dispatcher/src/machine/mod.rs index 735fdff23..99a4f2623 100644 --- a/offchain/dispatcher/src/machine/mod.rs +++ b/offchain/dispatcher/src/machine/mod.rs @@ -12,7 +12,6 @@ use self::rollups_broker::BrokerFacadeError; #[derive(Debug, Clone, Copy, Default)] pub struct RollupStatus { pub inputs_sent_count: u64, - pub last_event_is_finish_epoch: bool, } #[async_trait] diff --git a/offchain/dispatcher/src/machine/rollups_broker.rs b/offchain/dispatcher/src/machine/rollups_broker.rs index e430e4fe7..4ba712bfa 100644 --- a/offchain/dispatcher/src/machine/rollups_broker.rs +++ b/offchain/dispatcher/src/machine/rollups_broker.rs @@ -188,18 +188,8 @@ impl BrokerSend for BrokerFacade { impl From for RollupStatus { fn from(payload: RollupsInput) -> Self { - let inputs_sent_count = payload.inputs_sent_count; - - match payload.data { - RollupsData::AdvanceStateInput { .. } => RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch: false, - }, - - RollupsData::FinishEpoch { .. } => RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch: true, - }, + RollupStatus { + inputs_sent_count: payload.inputs_sent_count, } } } @@ -330,7 +320,6 @@ mod broker_facade_tests { let (_fixture, broker) = setup(&docker).await; let status = broker.status().await.expect("'status' function failed"); assert_eq!(status.inputs_sent_count, 0); - assert!(!status.last_event_is_finish_epoch); } #[tokio::test] @@ -340,7 +329,6 @@ mod broker_facade_tests { produce_advance_state_inputs(&fixture, 1).await; let status = broker.status().await.expect("'status' function failed"); assert_eq!(status.inputs_sent_count, 1); - assert!(!status.last_event_is_finish_epoch); } #[tokio::test] @@ -350,28 +338,6 @@ mod broker_facade_tests { produce_advance_state_inputs(&fixture, 10).await; let status = broker.status().await.expect("'status' function failed"); assert_eq!(status.inputs_sent_count, 10); - assert!(!status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_is_finish_epoch() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_finish_epoch_input(&fixture).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 0); - assert!(status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_inputs_with_finish_epoch() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_advance_state_inputs(&fixture, 5).await; - produce_finish_epoch_input(&fixture).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 5); - assert!(status.last_event_is_finish_epoch); } // -------------------------------------------------------------------------------------------- diff --git a/offchain/dispatcher/src/setup.rs b/offchain/dispatcher/src/setup.rs index 97f632168..08078a937 100644 --- a/offchain/dispatcher/src/setup.rs +++ b/offchain/dispatcher/src/setup.rs @@ -82,11 +82,11 @@ pub async fn create_context( ) -> Result { let dapp_deployment_block_number = U64::from(config.blockchain_config.dapp_deployment_block_number); - let genesis_timestamp: u64 = block_server + let genesis_block = block_server .query_block(dapp_deployment_block_number) .await .context(StateServerSnafu)? - .timestamp + .number .as_u64(); let epoch_length = config.epoch_duration; @@ -97,11 +97,13 @@ pub async fn create_context( ensure!(status.inputs_sent_count == 0, DirtyBrokerSnafu); let context = Context::new( - genesis_timestamp, + genesis_block, epoch_length, dapp_metadata, metrics, - status, + 0, + None, + None, ); Ok(context)