diff --git a/offchain/advance-runner/src/broker.rs b/offchain/advance-runner/src/broker.rs index 1993b6e19..beb01bf4a 100644 --- a/offchain/advance-runner/src/broker.rs +++ b/offchain/advance-runner/src/broker.rs @@ -13,6 +13,13 @@ pub enum BrokerFacadeError { #[snafu(display("broker internal error"))] BrokerInternalError { source: BrokerError }, + #[snafu(display( + "expected first_index from claim to be {}, but got {}", + expected, + got + ))] + InvalidIndexes { expected: u128, got: u128 }, + #[snafu(display("failed to consume input event"))] ConsumeError { source: BrokerError }, @@ -101,24 +108,40 @@ impl BrokerFacade { "producing rollups claim" ); - let result = self + let last_claim_event = self .client .peek_latest(&self.claims_stream) .await .context(BrokerInternalSnafu)?; - let claim_produced = match result { + let should_enqueue_claim = match last_claim_event { Some(event) => { - tracing::trace!(?event, "got last claim produced"); - rollups_claim.epoch_index <= event.payload.epoch_index + let last_claim = event.payload; + tracing::trace!(?last_claim, "got last claim from Redis"); + let should_enqueue_claim = + rollups_claim.epoch_index > last_claim.epoch_index; + + // If this happens, then something is wrong with the dispatcher. + let invalid_indexes = + rollups_claim.first_index != last_claim.last_index + 1; + if should_enqueue_claim && invalid_indexes { + tracing::debug!("rollups_claim.first_index = {}, last_claim.last_index = {}", + rollups_claim.first_index, last_claim.last_index); + return Err(BrokerFacadeError::InvalidIndexes { + expected: last_claim.last_index + 1, + got: rollups_claim.first_index, + }); + }; + + should_enqueue_claim } None => { tracing::trace!("no claims in the stream"); - false + true } }; - if !claim_produced { + if should_enqueue_claim { self.client .produce(&self.claims_stream, rollups_claim) .await @@ -300,4 +323,78 @@ mod tests { vec![rollups_claim0, rollups_claim1] ); } + + #[test_log::test(tokio::test)] + async fn test_invalid_indexes_overlapping() { + let docker = Cli::default(); + let mut state = TestState::setup(&docker).await; + let rollups_claim1 = RollupsClaim { + dapp_address: Address::new([0xa0; ADDRESS_SIZE]), + epoch_index: 0, + epoch_hash: Hash::new([0xb0; HASH_SIZE]), + first_index: 0, + last_index: 6, + }; + let rollups_claim2 = RollupsClaim { + dapp_address: Address::new([0xa0; ADDRESS_SIZE]), + epoch_index: 1, + epoch_hash: Hash::new([0xb0; HASH_SIZE]), + first_index: 6, + last_index: 7, + }; + state + .fixture + .produce_rollups_claim(rollups_claim1.clone()) + .await; + let result = state + .facade + .produce_rollups_claim(rollups_claim2.clone()) + .await; + assert!(result.is_err()); + assert_eq!( + BrokerFacadeError::InvalidIndexes { + expected: 7, + got: 6 + } + .to_string(), + result.unwrap_err().to_string() + ) + } + + #[test_log::test(tokio::test)] + async fn test_invalid_indexes_nonsequential() { + let docker = Cli::default(); + let mut state = TestState::setup(&docker).await; + let rollups_claim1 = RollupsClaim { + dapp_address: Address::new([0xa0; ADDRESS_SIZE]), + epoch_index: 0, + epoch_hash: Hash::new([0xb0; HASH_SIZE]), + first_index: 0, + last_index: 6, + }; + let rollups_claim2 = RollupsClaim { + dapp_address: Address::new([0xa0; ADDRESS_SIZE]), + epoch_index: 1, + epoch_hash: Hash::new([0xb0; HASH_SIZE]), + first_index: 11, + last_index: 14, + }; + state + .fixture + .produce_rollups_claim(rollups_claim1.clone()) + .await; + let result = state + .facade + .produce_rollups_claim(rollups_claim2.clone()) + .await; + assert!(result.is_err()); + assert_eq!( + BrokerFacadeError::InvalidIndexes { + expected: 7, + got: 11 + } + .to_string(), + result.unwrap_err().to_string() + ) + } } diff --git a/offchain/authority-claimer/src/checker.rs b/offchain/authority-claimer/src/checker.rs index 3ca293574..997972967 100644 --- a/offchain/authority-claimer/src/checker.rs +++ b/offchain/authority-claimer/src/checker.rs @@ -128,13 +128,13 @@ impl DuplicateChecker for DefaultDuplicateChecker { .map(|claim| claim.last_index + 1) // Maps to a number .unwrap_or(0); // If None, unwrap to 0 if rollups_claim.first_index == expected_first_index { - // This claim is the one the blockchain expects, so it is not considered duplicate. + // This claim is the one the blockchain expects, so it is not considered a duplicate. Ok(false) } else if rollups_claim.last_index < expected_first_index { // This claim is already on the blockchain. Ok(true) } else { - // This claim is not on blockchain, but it isn't the one blockchain expects. + // This claim is not on the blockchain, but it isn't the one the blockchain expects. // If this happens, there is a bug on the dispatcher. Err(DuplicateCheckerError::ClaimMismatch { expected_first_index, diff --git a/offchain/dispatcher/src/drivers/context.rs b/offchain/dispatcher/src/drivers/context.rs index a8063bbcf..04180ea5a 100644 --- a/offchain/dispatcher/src/drivers/context.rs +++ b/offchain/dispatcher/src/drivers/context.rs @@ -65,6 +65,13 @@ impl Context { broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { let input_block_number = input.block_added.number.as_u64(); + let input_epoch = self.calculate_epoch(input_block_number); + self.last_finished_epoch.map(|last_finished_epoch| { + // Asserting that the calculated epoch comes after the last finished epoch. + // (If last_finished_epoch == None then we don't need the assertion.) + assert!(input_epoch > last_finished_epoch) + }); + self.finish_epoch_if_needed(input_block_number, broker) .await?; @@ -76,13 +83,6 @@ impl Context { .inc(); self.inputs_sent += 1; - - let input_epoch = self.calculate_epoch(input_block_number); - self.last_finished_epoch.map(|last_finished_epoch| { - // Asserting that the calculated epoch comes after the last finished epoch. - // (If last_finished_epoch == None then we don't need the assertion.) - assert!(input_epoch > last_finished_epoch) - }); self.last_input_epoch = Some(input_epoch); Ok(()) diff --git a/offchain/dispatcher/src/drivers/machine.rs b/offchain/dispatcher/src/drivers/machine.rs index f899aae51..ecab9a3d8 100644 --- a/offchain/dispatcher/src/drivers/machine.rs +++ b/offchain/dispatcher/src/drivers/machine.rs @@ -36,8 +36,9 @@ impl MachineDriver { } }; - let block = block.number.as_u64(); - context.finish_epoch_if_needed(block, broker).await?; + let block_number = block.number.as_u64(); + tracing::debug!("reacting to standalone block {}", block_number); + context.finish_epoch_if_needed(block_number, broker).await?; Ok(()) } diff --git a/offchain/dispatcher/src/machine/rollups_broker.rs b/offchain/dispatcher/src/machine/rollups_broker.rs index a2f729766..99daa425d 100644 --- a/offchain/dispatcher/src/machine/rollups_broker.rs +++ b/offchain/dispatcher/src/machine/rollups_broker.rs @@ -185,10 +185,10 @@ impl BrokerSend for BrokerFacade { tracing::info!(?inputs_sent_count, "finishing epoch"); let mut broker = self.broker.lock().await; - let status = self.broker_status(&mut broker).await?; + let status = self.broker_status(&mut broker).await?; // Epoch number gets incremented here! let event = build_next_finish_epoch(&status); - tracing::trace!(?event, "producing finish epoch event"); + tracing::info!(?event, "producing finish epoch event"); epoch_sanity_check!(event, inputs_sent_count);