Skip to content

Commit

Permalink
refactor(message relayer): Use sails-generated client to decode `br…
Browse files Browse the repository at this point in the history
…idging-payment` events (#120)
  • Loading branch information
mertwole committed Sep 4, 2024
1 parent a8a7854 commit 9025d49
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 55 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ axum = "0.7.5"
bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] }
blake2 = "0.10.6"
bytes = "1.6.0"
clap = { version = "4.4.13", features = ["derive", "env"] }
cgo_oligami = "0.3"
circular-buffer = { version = "0.1.7", default-features = false, features = [
"alloc",
] }
clap = { version = "4.4.13", features = ["derive", "env"] }
derive_more = "0.99.17"
dotenv = "0.15.0"
env_logger = "0.9.0"
Expand All @@ -73,7 +74,7 @@ ethereum-types = { version = "0.14.1", default-features = false, features = [
"codec",
"rlp",
] }
extended_vft_wasm = { git = "https://github.com/gear-foundation/standards/", branch = "gstd-pinned-v1.5.0"}
extended_vft_wasm = { git = "https://github.com/gear-foundation/standards/", branch = "gstd-pinned-v1.5.0" }
ff = { version = "0.13.0", features = ["derive"] }
futures = { version = "0.3.30", features = ["executor"] }
futures-util = "0.3.28"
Expand Down Expand Up @@ -129,7 +130,7 @@ gsdk = "=1.5.0"
gclient = "=1.5.0"
gear-core = "=1.5.0"
gbuiltin-bls381 = { git = "https://github.com/gear-tech/gear.git", tag = "v1.5.0" }
gbuiltin-eth-bridge = { git = "https://github.com/gear-tech/gear.git", tag = "v1.5.0" }
gbuiltin-eth-bridge = { git = "https://github.com/gear-tech/gear.git", tag = "v1.5.0" }
pallet-gear-eth-bridge-rpc-runtime-api = { git = "https://github.com/gear-tech/gear.git", tag = "v1.5.0", default-features = false, features = [
"std",
] }
Expand Down Expand Up @@ -173,4 +174,4 @@ alloy = { version = "0.2.0", package = "alloy", features = [
] }

[patch.crates-io]
gsys = { git = "https://github.com/gear-tech/gear.git", tag = "v1.5.0" }
gsys = { git = "https://github.com/gear-tech/gear.git", tag = "v1.5.0" }
49 changes: 21 additions & 28 deletions gear-rpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use gsdk::{
metadata::{
gear::Event as GearEvent,
gear_eth_bridge::Event as GearBridgeEvent,
runtime_types::{
gear_core::message::user::UserMessage, gear_core_errors::simple::ReplyCode,
gprimitives::ActorId,
},
runtime_types::{gear_core::message::user::UserMessage, gprimitives::ActorId},
storage::{GrandpaStorage, SessionStorage},
vara_runtime::SessionKeys,
},
Expand Down Expand Up @@ -598,39 +595,35 @@ impl GearApi {
pub async fn user_message_sent_events(
&self,
from_program: H256,
to_user: H256,
block: H256,
) -> anyhow::Result<Vec<dto::UserMessageSent>> {
let events = self.api.get_events_at(Some(block)).await?;

let from = ActorId(from_program.0);

let events = events.into_iter().filter_map(|event| {
let (source, payload, details) =
if let RuntimeEvent::Gear(GearEvent::UserMessageSent {
message:
UserMessage {
source,
payload,
details,
..
},
..
}) = event
{
(source, payload, details?)
} else {
return None;
};

if source != from {
let RuntimeEvent::Gear(GearEvent::UserMessageSent {
message:
UserMessage {
source,
destination,
payload,
..
},
..
}) = event
else {
return None;
};

if source != ActorId(from_program.0) {
return None;
}

if let ReplyCode::Success(_) = details.code {
Some(dto::UserMessageSent { payload: payload.0 })
} else {
None
if destination != ActorId(to_user.0) {
return None;
}

Some(dto::UserMessageSent { payload: payload.0 })
});

Ok(events.collect())
Expand Down
4 changes: 3 additions & 1 deletion relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ primitive-types = { workspace = true, features = ["std"] }
prometheus.workspace = true
rand.workspace = true
reqwest.workspace = true
sails-rs.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
Expand All @@ -41,4 +42,5 @@ tokio.workspace = true
utils-prometheus.workspace = true

[build-dependencies]
cgo_oligami = "0.3"
cgo_oligami.workspace = true
sails-client-gen.workspace = true
25 changes: 24 additions & 1 deletion relayer/build.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
use sails_client_gen::ClientGenerator;
use std::{env, path::PathBuf};

fn main() {
go_bindings();
bridging_payment_client();
}

fn go_bindings() {
println!("cargo:rerun-if-changed=../gnark-wrapper/main.go");

cgo_oligami::Build::new()
.build_mode(cgo_oligami::BuildMode::CArchive)
.change_dir("./../gnark-wrapper")
.package("main.go")
.build("gnark_wrapper");
}

println!("cargo:rerun-if-changed=../gnark-wrapper/main.go");
fn bridging_payment_client() {
println!(
"cargo:rerun-if-changed=../gear-programs/bridging-payment/src/wasm/bridging-payment.idl"
);

let out_dir_path = PathBuf::from(env::var("OUT_DIR").unwrap());
let idl_file_path =
PathBuf::from("../gear-programs/bridging-payment/src/wasm/bridging-payment.idl");
let client_rs_file_path = out_dir_path.join("bridging_payment_client.rs");

ClientGenerator::from_idl_path(&idl_file_path)
.generate_to(client_rs_file_path)
.unwrap();
}
47 changes: 32 additions & 15 deletions relayer/src/message_relayer/common/message_paid_event_extractor.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
use std::sync::mpsc::{channel, Receiver, Sender};

use bridging_payment::services::BridgingPaymentEvents;
use futures::executor::block_on;
use gear_rpc_client::GearApi;
use parity_scale_codec::Decode;
use primitive_types::H256;
use prometheus::IntCounter;
use sails_rs::events::EventIo;
use utils_prometheus::{impl_metered_service, MeteredService};

use super::{GearBlockNumber, PaidMessage};

#[allow(dead_code)]
mod bridging_payment_client {
use sails_rs::prelude::*;

include!(concat!(env!("OUT_DIR"), "/bridging_payment_client.rs"));
}

use bridging_payment_client::bridging_payment::events::BridgingPaymentEvents;

pub struct MessagePaidEventExtractor {
bridging_payment_address: H256,

Expand Down Expand Up @@ -86,25 +94,34 @@ impl MessagePaidEventExtractor {
) -> anyhow::Result<()> {
let block_hash = self.gear_api.block_number_to_hash(block).await?;

// As bridging-payment uses sails to send events, destnation will be zeroed.
let destination = H256::zero();

let messages = self
.gear_api
.user_message_sent_events(self.bridging_payment_address, block_hash)
.user_message_sent_events(self.bridging_payment_address, destination, block_hash)
.await?;
if !messages.is_empty() {
log::info!("Found {} paid messages at block #{}", messages.len(), block);
self.metrics
.total_messages_found
.inc_by(messages.len() as u64);

for message in messages {
let user_reply = BridgingPaymentEvents::decode(&mut &message.payload[..])?;
let BridgingPaymentEvents::TeleportVaraToEth { nonce, .. } = user_reply;
if messages.is_empty() {
return Ok(());
}

let mut nonce_le = [0; 32];
nonce.to_little_endian(&mut nonce_le);
log::info!("Found {} paid messages at block #{}", messages.len(), block);

sender.send(PaidMessage { nonce: nonce_le })?;
}
self.metrics
.total_messages_found
.inc_by(messages.len() as u64);

for message in messages {
let user_reply = BridgingPaymentEvents::decode_event(message.payload)
.map_err(|_| anyhow::anyhow!("Failed to decode bridging payment event"))?;

let BridgingPaymentEvents::TeleportVaraToEth { nonce, .. } = user_reply;

let mut nonce_le = [0; 32];
nonce.to_little_endian(&mut nonce_le);

sender.send(PaidMessage { nonce: nonce_le })?;
}

Ok(())
Expand Down
14 changes: 8 additions & 6 deletions relayer/src/message_relayer/common/paid_messages_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,15 @@ impl PaidMessagesFilter {
) -> anyhow::Result<()> {
loop {
for message in messages.try_iter() {
self.pending_messages
if let Some(msg) = self
.pending_messages
.insert(message.message.nonce_le, message)
.expect("Received 2 messages with the same nonce");
{
panic!(
"Received 2 messages with the same nonce: {}",
hex::encode(msg.message.nonce_le)
);
}
}

for PaidMessage { nonce } in paid_messages.try_iter() {
Expand All @@ -95,10 +101,6 @@ impl PaidMessagesFilter {
}
}

if !self.pending_nonces.is_empty() {
log::warn!("Discovered message that was paid but it's contents haven't discovered");
}

self.metrics
.pending_messages_count
.set(self.pending_messages.len() as i64);
Expand Down

0 comments on commit 9025d49

Please sign in to comment.