From ec200ef502ed3e453cc92eaeae896f5470af1783 Mon Sep 17 00:00:00 2001 From: Marcel Moura Date: Mon, 18 Sep 2023 22:16:38 -0300 Subject: [PATCH] fixup! feat(offchain): implement the broker-listener for the authority-claimer --- offchain/authority-claimer/src/claimer.rs | 105 ++++++++++++------- offchain/authority-claimer/src/lib.rs | 59 +++++------ offchain/authority-claimer/src/listener.rs | 113 +++++++-------------- offchain/authority-claimer/src/mock.rs | 78 ++------------ 4 files changed, 141 insertions(+), 214 deletions(-) diff --git a/offchain/authority-claimer/src/claimer.rs b/offchain/authority-claimer/src/claimer.rs index d9b9040a1..98872eb59 100644 --- a/offchain/authority-claimer/src/claimer.rs +++ b/offchain/authority-claimer/src/claimer.rs @@ -2,27 +2,38 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use async_trait::async_trait; -use rollups_events::RollupsClaim; use snafu::ResultExt; use std::fmt::Debug; -use tracing::info; +use tracing::{info, trace}; -use crate::{checker::DuplicateChecker, sender::TransactionSender}; +use crate::{ + checker::DuplicateChecker, listener::BrokerListener, + sender::TransactionSender, +}; -/// The `Claimer` sends claims to the blockchain. It checks -/// whether the claim is duplicated before sending. +/// The `Claimer` starts an event loop that waits for claim messages +/// from the broker, and then sends the claims to the blockchain. It checks to +/// see if the claim is duplicated before sending. +/// +/// It uses three injected traits, `BrokerListener`, `DuplicateChecker`, and +/// `TransactionSender`, to, respectivelly, listen for messages, check for +/// duplicated claims, and send claims to the blockchain. #[async_trait] pub trait Claimer: Sized + Debug { type Error: snafu::Error + 'static; - async fn send_rollups_claim( - self, - rollups_claim: RollupsClaim, - ) -> Result; + async fn start(mut self) -> Result<(), Self::Error>; } #[derive(Debug, snafu::Snafu)] -pub enum ClaimerError { +pub enum ClaimerError< + B: BrokerListener, + D: DuplicateChecker, + T: TransactionSender, +> { + #[snafu(display("duplicated claim error"))] + BrokerListenerError { source: B::Error }, + #[snafu(display("duplicated claim error"))] DuplicatedClaimError { source: D::Error }, @@ -30,17 +41,33 @@ pub enum ClaimerError { TransactionSenderError { source: T::Error }, } +// ------------------------------------------------------------------------------------------------ +// AbstractClaimer +// ------------------------------------------------------------------------------------------------ + /// The `AbstractClaimer` must be injected with a -/// `DuplicateChecker` and a `TransactionSender`. +/// `BrokerListener`, a `DuplicateChecker` and a `TransactionSender`. #[derive(Debug)] -pub struct AbstractClaimer { +pub struct AbstractClaimer< + B: BrokerListener, + D: DuplicateChecker, + T: TransactionSender, +> { + broker_listener: B, duplicate_checker: D, transaction_sender: T, } -impl AbstractClaimer { - pub fn new(duplicate_checker: D, transaction_sender: T) -> Self { +impl + AbstractClaimer +{ + pub fn new( + broker_listener: B, + duplicate_checker: D, + transaction_sender: T, + ) -> Self { Self { + broker_listener, duplicate_checker, transaction_sender, } @@ -48,34 +75,40 @@ impl AbstractClaimer { } #[async_trait] -impl Claimer for AbstractClaimer +impl Claimer for AbstractClaimer where + B: BrokerListener + Send + Sync + 'static, D: DuplicateChecker + Send + Sync + 'static, T: TransactionSender + Send + 'static, { - type Error = ClaimerError; + type Error = ClaimerError; - async fn send_rollups_claim( - mut self, - rollups_claim: RollupsClaim, - ) -> Result { - let is_duplicated_rollups_claim = self - .duplicate_checker - .is_duplicated_rollups_claim(&rollups_claim) - .await - .context(DuplicatedClaimSnafu)?; - if is_duplicated_rollups_claim { - info!("Duplicate claim will not be sent"); - return Ok(self); - } + async fn start(mut self) -> Result<(), Self::Error> { + trace!("Starting the authority claimer loop"); + loop { + let rollups_claim = self + .broker_listener + .listen() + .await + .context(BrokerListenerSnafu)?; + trace!("Got a claim from the broker: {:?}", rollups_claim); - info!("Sending a new rollups claim"); - self.transaction_sender = self - .transaction_sender - .send_rollups_claim_transaction(rollups_claim) - .await - .context(TransactionSenderSnafu)?; + let is_duplicated_rollups_claim = self + .duplicate_checker + .is_duplicated_rollups_claim(&rollups_claim) + .await + .context(DuplicatedClaimSnafu)?; + if is_duplicated_rollups_claim { + trace!("It was a duplicated claim"); + continue; + } - Ok(self) + info!("Sending a new rollups claim"); + self.transaction_sender = self + .transaction_sender + .send_rollups_claim_transaction(rollups_claim) + .await + .context(TransactionSenderSnafu)? + } } } diff --git a/offchain/authority-claimer/src/lib.rs b/offchain/authority-claimer/src/lib.rs index d057cde67..4e06d4f85 100644 --- a/offchain/authority-claimer/src/lib.rs +++ b/offchain/authority-claimer/src/lib.rs @@ -18,8 +18,8 @@ use tracing::trace; use crate::{ checker::DefaultDuplicateChecker, - claimer::AbstractClaimer, - listener::{BrokerListener, DefaultBrokerListener}, + claimer::{AbstractClaimer, Claimer}, + listener::DefaultBrokerListener, metrics::AuthorityClaimerMetrics, sender::DefaultTransactionSender, }; @@ -30,40 +30,35 @@ pub async fn run(config: Config) -> Result<(), Box> { let http_server_handle = http_server::start(config.http_server_config, metrics.clone().into()); - let claimer_handle = { - let config = config.authority_claimer_config; - - let dapp_address = config.dapp_address; - let dapp_metadata = DAppMetadata { - chain_id: config.tx_manager_config.chain_id, - dapp_address, - }; - - // Creating the duplicate checker. - trace!("Creating the duplicate checker"); - let duplicate_checker = DefaultDuplicateChecker::new()?; + let config = config.authority_claimer_config; + let dapp_address = config.dapp_address; + let dapp_metadata = DAppMetadata { + chain_id: config.tx_manager_config.chain_id, + dapp_address, + }; - // Creating the transaction sender. - trace!("Creating the transaction sender"); - let transaction_sender = DefaultTransactionSender::new( - dapp_metadata.clone(), - metrics.clone(), - )?; + // Creating the broker listener. + trace!("Creating the broker listener"); + let broker_listener = + DefaultBrokerListener::new(config.broker_config, dapp_metadata.clone()) + .await?; - // Creating the broker listener. - trace!("Creating the broker listener"); - let broker_listener = - DefaultBrokerListener::new(config.broker_config, dapp_metadata) - .await?; + // Creating the duplicate checker. + trace!("Creating the duplicate checker"); + let duplicate_checker = DefaultDuplicateChecker::new()?; - // Creating the claimer. - trace!("Creating the claimer"); - let claimer = - AbstractClaimer::new(duplicate_checker, transaction_sender); + // Creating the transaction sender. + trace!("Creating the transaction sender"); + let transaction_sender = + DefaultTransactionSender::new(dapp_metadata, metrics)?; - // Returning the claimer event loop. - broker_listener.start(claimer) - }; + // Creating the claimer loop. + let claimer = AbstractClaimer::new( + broker_listener, + duplicate_checker, + transaction_sender, + ); + let claimer_handle = claimer.start(); // Starting the HTTP server and the claimer loop. tokio::select! { diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 29dd2c98c..d3576471a 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -3,23 +3,19 @@ use async_trait::async_trait; use rollups_events::{ - Broker, BrokerConfig, BrokerError, DAppMetadata, RollupsClaimsStream, - INITIAL_ID, + Broker, BrokerConfig, BrokerError, DAppMetadata, RollupsClaim, + RollupsClaimsStream, INITIAL_ID, }; use snafu::ResultExt; use std::fmt::Debug; -use tracing::trace; -use crate::claimer::Claimer; - -/// The `BrokerListener` starts a perpetual loop that listens for new claims from -/// the broker and sends them to be processed by the injected `Claimer`. +/// The `BrokerListener` listens for new claims from the broker #[async_trait] -pub trait BrokerListener { +pub trait BrokerListener: Debug { type Error: snafu::Error + 'static; - /// Starts the polling loop. - async fn start(self, claimer: C) -> Result<(), Self::Error>; + /// Listen to claims + async fn listen(&mut self) -> Result; } // ------------------------------------------------------------------------------------------------ @@ -34,12 +30,9 @@ pub struct DefaultBrokerListener { } #[derive(Debug, snafu::Snafu)] -pub enum BrokerListenerError { +pub enum BrokerListenerError { #[snafu(display("broker error"))] BrokerError { source: BrokerError }, - - #[snafu(display("claimer error"))] - ClaimerError { source: C::Error }, } impl DefaultBrokerListener { @@ -60,32 +53,18 @@ impl DefaultBrokerListener { } #[async_trait] -impl BrokerListener for DefaultBrokerListener -where - C: Claimer + Send + 'static, -{ - type Error = BrokerListenerError; - - async fn start(mut self, mut claimer: C) -> Result<(), Self::Error> { - trace!("Starting the event loop"); - loop { - tracing::trace!("Waiting for claim with id {}", self.last_claim_id); - let event = self - .broker - .consume_blocking(&self.stream, &self.last_claim_id) - .await - .context(BrokerSnafu)?; - - let rollups_claim = event.payload.clone(); - trace!("Got a claim from the broker: {:?}", rollups_claim); - claimer = claimer - .send_rollups_claim(rollups_claim) - .await - .context(ClaimerSnafu)?; - tracing::trace!("Consumed event {:?}", event); - - self.last_claim_id = event.id; - } +impl BrokerListener for DefaultBrokerListener { + type Error = BrokerListenerError; + + async fn listen(&mut self) -> Result { + tracing::trace!("Waiting for claim with id {}", self.last_claim_id); + let event = self + .broker + .consume_blocking(&self.stream, &self.last_claim_id) + .await + .context(BrokerSnafu)?; + + Ok(event.payload.clone()) } } @@ -121,28 +100,35 @@ mod tests { } #[tokio::test] - async fn start_broker_listener_with_claims_enqueued() { + async fn start_broker_listener_with_one_claim_enqueued() { let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; + let (fixture, mut broker_listener) = setup(&docker).await; let n = 5; - let claimer = mock::Claimer::new(n); mock::produce_rollups_claims(&fixture, n, 0).await; mock::produce_last_claim(&fixture, n).await; - let result = broker.start(claimer).await; - mock::assert_broker_listener_ended(result); + let result = broker_listener.listen().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn start_broker_listener_with_claims_enqueued() { + let docker = Cli::default(); + let (fixture, mut broker_listener) = setup(&docker).await; + mock::produce_last_claim(&fixture, 0).await; + let claim = broker_listener.listen().await; + assert!(claim.is_ok()); } #[tokio::test] - async fn start_broker_listener_with_no_claims_enqueued() { + async fn start_broker_listener_listener_with_no_claims_enqueued() { let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; + let (fixture, mut broker_listener) = setup(&docker).await; let n = 7; - let claimer = mock::Claimer::new(n); - let broker_listener = tokio::spawn(async move { + let broker_listener_thread = tokio::spawn(async move { println!("Spawned the broker-listener thread."); - let result = broker.start(claimer).await; - mock::assert_broker_listener_ended(result); + let claim = broker_listener.listen().await; + assert!(claim.is_ok()); }); println!("Going to sleep for 1 second."); @@ -162,29 +148,6 @@ mod tests { assert_eq!(x + y, n); mock::produce_last_claim(&fixture, n).await; - broker_listener.await.unwrap(); - } - - #[tokio::test] - async fn start_broker_listener_and_fail_without_consuming_claims() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - let n = 0; - let claimer = mock::Claimer::new_with_error(n); - mock::produce_last_claim(&fixture, n).await; - let result = broker.start(claimer).await; - mock::assert_broker_listener_failed(result); - } - - #[tokio::test] - async fn start_broker_listener_and_fail_after_consuming_some_claims() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - let n = 5; - let claimer = mock::Claimer::new_with_error(n); - mock::produce_rollups_claims(&fixture, n, 0).await; - mock::produce_last_claim(&fixture, n).await; - let result = broker.start(claimer).await; - mock::assert_broker_listener_failed(result); + broker_listener_thread.await.unwrap(); } } diff --git a/offchain/authority-claimer/src/mock.rs b/offchain/authority-claimer/src/mock.rs index a52b25c72..42da4450b 100644 --- a/offchain/authority-claimer/src/mock.rs +++ b/offchain/authority-claimer/src/mock.rs @@ -1,90 +1,26 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + use std::time::Duration; -use async_trait::async_trait; use backoff::ExponentialBackoffBuilder; use rollups_events::{ BrokerConfig, BrokerEndpoint, BrokerError, DAppMetadata, RedactedUrl, RollupsClaim, Url, }; -use snafu::{OptionExt, Snafu}; +use snafu::Snafu; use test_fixtures::BrokerFixture; use testcontainers::clients::Cli; -use crate::{ - claimer, - listener::{BrokerListenerError, DefaultBrokerListener}, -}; - -#[derive(Debug)] -pub struct Claimer { - results: Vec>, -} +use crate::listener::DefaultBrokerListener; #[derive(Clone, Debug, Snafu)] -pub enum ClaimerError { +pub enum MockError { EndError, InternalError, MockError, } -impl Claimer { - /// Creates a `Claimer` that proccesses `n` claims before returning - /// the `ClaimerError::EndError` error. - pub fn new(n: usize) -> Self { - let mut results: Vec> = vec![Ok(()); n]; - results.insert(0, Err(ClaimerError::EndError)); - Self { results } - } - - /// Creates a `Claimer` that proccesses `n` claims before returning - /// the `ClaimerError::MockError` error. - pub fn new_with_error(n: usize) -> Self { - let mut results: Vec> = vec![Ok(()); n]; - results.insert(0, Err(ClaimerError::MockError)); - Self { results } - } -} - -#[async_trait] -impl claimer::Claimer for Claimer { - type Error = ClaimerError; - - async fn send_rollups_claim( - mut self, - _: RollupsClaim, - ) -> Result { - let length = self.results.len() - 1; - println!("The mock claimer is consuming claim {}.", length); - self.results.pop().context(InternalSnafu)?.map(|_| self) - } -} - -pub fn assert_broker_listener_ended( - result: Result<(), BrokerListenerError>, -) { - assert!(result.is_err()); - match result { - Ok(_) => panic!("broker listener returned with Ok(())"), - Err(BrokerListenerError::ClaimerError { source }) => { - assert_eq!(source.to_string(), ClaimerError::EndError.to_string()) - } - Err(err) => panic!("broker listener failed with error {:?}", err), - } -} - -pub fn assert_broker_listener_failed( - result: Result<(), BrokerListenerError>, -) { - assert!(result.is_err()); - match result { - Ok(_) => panic!("broker listener returned with Ok(())"), - Err(BrokerListenerError::ClaimerError { source }) => { - assert_eq!(source.to_string(), ClaimerError::MockError.to_string()) - } - Err(err) => panic!("broker listener failed with error {:?}", err), - } -} - pub async fn setup_broker( docker: &Cli, should_fail: bool, @@ -130,7 +66,7 @@ pub async fn produce_rollups_claims( rollups_claims } -/// The last claim should trigger the `ClaimerError::EndError` error. +/// The last claim should trigger an `EndError` error. pub async fn produce_last_claim( fixture: &BrokerFixture<'_>, epoch_index: usize,