Skip to content

Commit

Permalink
fixup! feat(offchain): implement the broker-listener for the authorit…
Browse files Browse the repository at this point in the history
…y-claimer
  • Loading branch information
marcelstanley committed Sep 25, 2023
1 parent 36cce0a commit ec200ef
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 214 deletions.
105 changes: 69 additions & 36 deletions offchain/authority-claimer/src/claimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,113 @@
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use async_trait::async_trait;
use rollups_events::RollupsClaim;
use snafu::ResultExt;
use std::fmt::Debug;
use tracing::info;
use tracing::{info, trace};

use crate::{checker::DuplicateChecker, sender::TransactionSender};
use crate::{
checker::DuplicateChecker, listener::BrokerListener,
sender::TransactionSender,
};

/// The `Claimer` sends claims to the blockchain. It checks
/// whether the claim is duplicated before sending.
/// The `Claimer` starts an event loop that waits for claim messages
/// from the broker, and then sends the claims to the blockchain. It checks to
/// see if the claim is duplicated before sending.
///
/// It uses three injected traits, `BrokerListener`, `DuplicateChecker`, and
/// `TransactionSender`, to, respectivelly, listen for messages, check for
/// duplicated claims, and send claims to the blockchain.
#[async_trait]
pub trait Claimer: Sized + Debug {
type Error: snafu::Error + 'static;

async fn send_rollups_claim(
self,
rollups_claim: RollupsClaim,
) -> Result<Self, Self::Error>;
async fn start(mut self) -> Result<(), Self::Error>;
}

#[derive(Debug, snafu::Snafu)]
pub enum ClaimerError<D: DuplicateChecker, T: TransactionSender> {
pub enum ClaimerError<
B: BrokerListener,
D: DuplicateChecker,
T: TransactionSender,
> {
#[snafu(display("duplicated claim error"))]
BrokerListenerError { source: B::Error },

#[snafu(display("duplicated claim error"))]
DuplicatedClaimError { source: D::Error },

#[snafu(display("transaction sender error"))]
TransactionSenderError { source: T::Error },
}

// ------------------------------------------------------------------------------------------------
// AbstractClaimer
// ------------------------------------------------------------------------------------------------

/// The `AbstractClaimer` must be injected with a
/// `DuplicateChecker` and a `TransactionSender`.
/// `BrokerListener`, a `DuplicateChecker` and a `TransactionSender`.
#[derive(Debug)]
pub struct AbstractClaimer<D: DuplicateChecker, T: TransactionSender> {
pub struct AbstractClaimer<
B: BrokerListener,
D: DuplicateChecker,
T: TransactionSender,
> {
broker_listener: B,
duplicate_checker: D,
transaction_sender: T,
}

impl<D: DuplicateChecker, T: TransactionSender> AbstractClaimer<D, T> {
pub fn new(duplicate_checker: D, transaction_sender: T) -> Self {
impl<B: BrokerListener, D: DuplicateChecker, T: TransactionSender>
AbstractClaimer<B, D, T>
{
pub fn new(
broker_listener: B,
duplicate_checker: D,
transaction_sender: T,
) -> Self {
Self {
broker_listener,
duplicate_checker,
transaction_sender,
}
}
}

#[async_trait]
impl<D, T> Claimer for AbstractClaimer<D, T>
impl<B, D, T> Claimer for AbstractClaimer<B, D, T>
where
B: BrokerListener + Send + Sync + 'static,
D: DuplicateChecker + Send + Sync + 'static,
T: TransactionSender + Send + 'static,
{
type Error = ClaimerError<D, T>;
type Error = ClaimerError<B, D, T>;

async fn send_rollups_claim(
mut self,
rollups_claim: RollupsClaim,
) -> Result<Self, Self::Error> {
let is_duplicated_rollups_claim = self
.duplicate_checker
.is_duplicated_rollups_claim(&rollups_claim)
.await
.context(DuplicatedClaimSnafu)?;
if is_duplicated_rollups_claim {
info!("Duplicate claim will not be sent");
return Ok(self);
}
async fn start(mut self) -> Result<(), Self::Error> {
trace!("Starting the authority claimer loop");
loop {
let rollups_claim = self
.broker_listener
.listen()
.await
.context(BrokerListenerSnafu)?;
trace!("Got a claim from the broker: {:?}", rollups_claim);

info!("Sending a new rollups claim");
self.transaction_sender = self
.transaction_sender
.send_rollups_claim_transaction(rollups_claim)
.await
.context(TransactionSenderSnafu)?;
let is_duplicated_rollups_claim = self
.duplicate_checker
.is_duplicated_rollups_claim(&rollups_claim)
.await
.context(DuplicatedClaimSnafu)?;
if is_duplicated_rollups_claim {
trace!("It was a duplicated claim");
continue;
}

Ok(self)
info!("Sending a new rollups claim");
self.transaction_sender = self
.transaction_sender
.send_rollups_claim_transaction(rollups_claim)
.await
.context(TransactionSenderSnafu)?
}
}
}
59 changes: 27 additions & 32 deletions offchain/authority-claimer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use tracing::trace;

use crate::{
checker::DefaultDuplicateChecker,
claimer::AbstractClaimer,
listener::{BrokerListener, DefaultBrokerListener},
claimer::{AbstractClaimer, Claimer},
listener::DefaultBrokerListener,
metrics::AuthorityClaimerMetrics,
sender::DefaultTransactionSender,
};
Expand All @@ -30,40 +30,35 @@ pub async fn run(config: Config) -> Result<(), Box<dyn Error>> {
let http_server_handle =
http_server::start(config.http_server_config, metrics.clone().into());

let claimer_handle = {
let config = config.authority_claimer_config;

let dapp_address = config.dapp_address;
let dapp_metadata = DAppMetadata {
chain_id: config.tx_manager_config.chain_id,
dapp_address,
};

// Creating the duplicate checker.
trace!("Creating the duplicate checker");
let duplicate_checker = DefaultDuplicateChecker::new()?;
let config = config.authority_claimer_config;
let dapp_address = config.dapp_address;
let dapp_metadata = DAppMetadata {
chain_id: config.tx_manager_config.chain_id,
dapp_address,
};

// Creating the transaction sender.
trace!("Creating the transaction sender");
let transaction_sender = DefaultTransactionSender::new(
dapp_metadata.clone(),
metrics.clone(),
)?;
// Creating the broker listener.
trace!("Creating the broker listener");
let broker_listener =
DefaultBrokerListener::new(config.broker_config, dapp_metadata.clone())
.await?;

// Creating the broker listener.
trace!("Creating the broker listener");
let broker_listener =
DefaultBrokerListener::new(config.broker_config, dapp_metadata)
.await?;
// Creating the duplicate checker.
trace!("Creating the duplicate checker");
let duplicate_checker = DefaultDuplicateChecker::new()?;

// Creating the claimer.
trace!("Creating the claimer");
let claimer =
AbstractClaimer::new(duplicate_checker, transaction_sender);
// Creating the transaction sender.
trace!("Creating the transaction sender");
let transaction_sender =
DefaultTransactionSender::new(dapp_metadata, metrics)?;

// Returning the claimer event loop.
broker_listener.start(claimer)
};
// Creating the claimer loop.
let claimer = AbstractClaimer::new(
broker_listener,
duplicate_checker,
transaction_sender,
);
let claimer_handle = claimer.start();

// Starting the HTTP server and the claimer loop.
tokio::select! {
Expand Down
113 changes: 38 additions & 75 deletions offchain/authority-claimer/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@

use async_trait::async_trait;
use rollups_events::{
Broker, BrokerConfig, BrokerError, DAppMetadata, RollupsClaimsStream,
INITIAL_ID,
Broker, BrokerConfig, BrokerError, DAppMetadata, RollupsClaim,
RollupsClaimsStream, INITIAL_ID,
};
use snafu::ResultExt;
use std::fmt::Debug;
use tracing::trace;

use crate::claimer::Claimer;

/// The `BrokerListener` starts a perpetual loop that listens for new claims from
/// the broker and sends them to be processed by the injected `Claimer`.
/// The `BrokerListener` listens for new claims from the broker
#[async_trait]
pub trait BrokerListener<C: Claimer> {
pub trait BrokerListener: Debug {
type Error: snafu::Error + 'static;

/// Starts the polling loop.
async fn start(self, claimer: C) -> Result<(), Self::Error>;
/// Listen to claims
async fn listen(&mut self) -> Result<RollupsClaim, Self::Error>;
}

// ------------------------------------------------------------------------------------------------
Expand All @@ -34,12 +30,9 @@ pub struct DefaultBrokerListener {
}

#[derive(Debug, snafu::Snafu)]
pub enum BrokerListenerError<C: Claimer> {
pub enum BrokerListenerError {
#[snafu(display("broker error"))]
BrokerError { source: BrokerError },

#[snafu(display("claimer error"))]
ClaimerError { source: C::Error },
}

impl DefaultBrokerListener {
Expand All @@ -60,32 +53,18 @@ impl DefaultBrokerListener {
}

#[async_trait]
impl<C> BrokerListener<C> for DefaultBrokerListener
where
C: Claimer + Send + 'static,
{
type Error = BrokerListenerError<C>;

async fn start(mut self, mut claimer: C) -> Result<(), Self::Error> {
trace!("Starting the event loop");
loop {
tracing::trace!("Waiting for claim with id {}", self.last_claim_id);
let event = self
.broker
.consume_blocking(&self.stream, &self.last_claim_id)
.await
.context(BrokerSnafu)?;

let rollups_claim = event.payload.clone();
trace!("Got a claim from the broker: {:?}", rollups_claim);
claimer = claimer
.send_rollups_claim(rollups_claim)
.await
.context(ClaimerSnafu)?;
tracing::trace!("Consumed event {:?}", event);

self.last_claim_id = event.id;
}
impl BrokerListener for DefaultBrokerListener {
type Error = BrokerListenerError;

async fn listen(&mut self) -> Result<RollupsClaim, Self::Error> {
tracing::trace!("Waiting for claim with id {}", self.last_claim_id);
let event = self
.broker
.consume_blocking(&self.stream, &self.last_claim_id)
.await
.context(BrokerSnafu)?;

Ok(event.payload.clone())
}
}

Expand Down Expand Up @@ -121,28 +100,35 @@ mod tests {
}

#[tokio::test]
async fn start_broker_listener_with_claims_enqueued() {
async fn start_broker_listener_with_one_claim_enqueued() {
let docker = Cli::default();
let (fixture, broker) = setup(&docker).await;
let (fixture, mut broker_listener) = setup(&docker).await;
let n = 5;
let claimer = mock::Claimer::new(n);
mock::produce_rollups_claims(&fixture, n, 0).await;
mock::produce_last_claim(&fixture, n).await;
let result = broker.start(claimer).await;
mock::assert_broker_listener_ended(result);
let result = broker_listener.listen().await;
assert!(result.is_ok());
}

#[tokio::test]
async fn start_broker_listener_with_claims_enqueued() {
let docker = Cli::default();
let (fixture, mut broker_listener) = setup(&docker).await;
mock::produce_last_claim(&fixture, 0).await;
let claim = broker_listener.listen().await;
assert!(claim.is_ok());
}

#[tokio::test]
async fn start_broker_listener_with_no_claims_enqueued() {
async fn start_broker_listener_listener_with_no_claims_enqueued() {
let docker = Cli::default();
let (fixture, broker) = setup(&docker).await;
let (fixture, mut broker_listener) = setup(&docker).await;
let n = 7;
let claimer = mock::Claimer::new(n);

let broker_listener = tokio::spawn(async move {
let broker_listener_thread = tokio::spawn(async move {
println!("Spawned the broker-listener thread.");
let result = broker.start(claimer).await;
mock::assert_broker_listener_ended(result);
let claim = broker_listener.listen().await;
assert!(claim.is_ok());
});

println!("Going to sleep for 1 second.");
Expand All @@ -162,29 +148,6 @@ mod tests {
assert_eq!(x + y, n);
mock::produce_last_claim(&fixture, n).await;

broker_listener.await.unwrap();
}

#[tokio::test]
async fn start_broker_listener_and_fail_without_consuming_claims() {
let docker = Cli::default();
let (fixture, broker) = setup(&docker).await;
let n = 0;
let claimer = mock::Claimer::new_with_error(n);
mock::produce_last_claim(&fixture, n).await;
let result = broker.start(claimer).await;
mock::assert_broker_listener_failed(result);
}

#[tokio::test]
async fn start_broker_listener_and_fail_after_consuming_some_claims() {
let docker = Cli::default();
let (fixture, broker) = setup(&docker).await;
let n = 5;
let claimer = mock::Claimer::new_with_error(n);
mock::produce_rollups_claims(&fixture, n, 0).await;
mock::produce_last_claim(&fixture, n).await;
let result = broker.start(claimer).await;
mock::assert_broker_listener_failed(result);
broker_listener_thread.await.unwrap();
}
}
Loading

0 comments on commit ec200ef

Please sign in to comment.