From 24c36d32fcf3d25392580a7bfbd36ce46a9e259d Mon Sep 17 00:00:00 2001 From: Web3 Philosopher Date: Mon, 11 Mar 2024 23:24:42 +0300 Subject: [PATCH] fix event query rpc (#122) --- .github/workflows/docker.yml | 4 +++- Cargo.lock | 7 ++----- evm/abi/src/conversions.rs | 5 +++++ modules/client/Cargo.toml | 5 +---- modules/client/src/internals.rs | 30 ++++++++++++++---------------- modules/client/src/lib.rs | 8 ++++---- modules/client/src/types.rs | 10 ++-------- modules/client/tests/streams.rs | 11 ++++++----- modules/ismp/core/src/events.rs | 13 +++++++++++++ modules/ismp/pallet/rpc/src/lib.rs | 10 ++++------ parachain/node/Cargo.toml | 2 +- 11 files changed, 55 insertions(+), 50 deletions(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 7d86d6f27..b9f3f57d5 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -1,7 +1,9 @@ name: Docker Release on: - workflow_dispatch: + push: + tags: + - '**[0-9]+.[0-9]+.[0-9]+*' concurrency: group: release-${{ github.ref }} diff --git a/Cargo.lock b/Cargo.lock index 152278ef7..1d8a6a9f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5410,7 +5410,7 @@ dependencies = [ [[package]] name = "hyperbridge" -version = "0.3.6" +version = "0.3.7" dependencies = [ "clap", "cumulus-client-cli", @@ -5479,10 +5479,9 @@ dependencies = [ [[package]] name = "hyperclient" -version = "0.2.1" +version = "0.2.2" dependencies = [ "anyhow", - "async-trait", "console_error_panic_hook", "ethers", "fluvio-wasm-timer", @@ -5497,12 +5496,10 @@ dependencies = [ "parity-scale-codec", "primitive-types", "reconnecting-jsonrpsee-ws-client", - "reqwest", "serde", "serde-wasm-bindgen", "serde_json", "subxt", - "tiny-keccak", "tokio", "tracing", "tracing-wasm", diff --git a/evm/abi/src/conversions.rs b/evm/abi/src/conversions.rs index f0a87009d..53f68b06f 100644 --- a/evm/abi/src/conversions.rs +++ b/evm/abi/src/conversions.rs @@ -302,6 +302,11 @@ impl TryFrom for ismp::events::Event { timeout_timestamp: resp.res_timeout_timestamp.low_u64(), gas_limit: resp.res_gaslimit.low_u64(), })), + EvmHostEvents::PostRequestHandledFilter(handled) => + Ok(ismp::events::Event::PostRequestHandled(ismp::events::PostRequestHandled { + commitment: handled.commitment.into(), + relayer: handled.relayer.as_bytes().to_vec(), + })), event => Err(anyhow!("Unsupported event {event:?}")), } } diff --git a/modules/client/Cargo.toml b/modules/client/Cargo.toml index 712e2bc45..e44b13d5f 100644 --- a/modules/client/Cargo.toml +++ b/modules/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hyperclient" -version = "0.2.1" +version = "0.2.2" edition = "2021" description = "The hyperclient is a library for managing (in-flight) ISMP requests" repository = "https://github.com/polytope-labs/hyperbridge" @@ -18,13 +18,10 @@ hex-literal = { version = "0.4.1" } serde-wasm-bindgen = { version = "0.6.3", default-features = false } serde = { version = "1.0.196", features = ["derive"], default-features = false } wasm-bindgen-futures = "0.4.40" -tiny-keccak = { version = "2.0.2", features = ["keccak"] } codec = { package = "parity-scale-codec", version = "3.1.3", default-features = false } -async-trait = { version = "0.1.53", default-features = false } futures = "0.3.30" wasm-streams = "0.4.0" tokio = { version = "1.35.1", features = ["macros"] } -reqwest = { version = "0.11.23", features = ["json"] } wasm-bindgen-test = "0.3.41" js-sys = "0.3.68" web-sys = "0.3.68" diff --git a/modules/client/src/internals.rs b/modules/client/src/internals.rs index 8ca4d4841..a928ed3ad 100644 --- a/modules/client/src/internals.rs +++ b/modules/client/src/internals.rs @@ -280,11 +280,12 @@ pub async fn timeout_request_stream( } }; - let response = lambda().await; - match response { - Ok(res) => res, - Err(e) => Some((Err(anyhow!("Encountered an error in stream {e:?}")), state)), - } + lambda().await.unwrap_or_else(|e| { + Some(( + Err(anyhow!("Encountered an error in stream {e:?}")), + TimeoutStreamState::End, + )) + }) } }); @@ -496,6 +497,8 @@ pub async fn request_status_stream( }, PostStreamState::HyperbridgeFinalized(finalized_height) => { let res = dest_client.query_request_receipt(hash).await?; + let request_commitment = + hash_request::(&Request::Post(post.clone())); if res != H160::zero() { let latest_height = dest_client.query_latest_block_height().await?; let meta = dest_client @@ -503,9 +506,8 @@ pub async fn request_status_stream( .await? .into_iter() .find_map(|event| match event.event { - Event::PostRequest(post_event) - if post.source == post_event.source && - post.nonce == post_event.nonce => + Event::PostRequestHandled(handled) + if handled.commitment == request_commitment => Some(event.meta), _ => None, }) @@ -539,14 +541,10 @@ pub async fn request_status_stream( } }; - let response = lambda().await; - match response { - Ok(res) => res, - Err(e) => Some(( - Err(anyhow!("Encountered an error in stream {e:?}")), - post_request_status, - )), - } + // terminate the stream once an error is encountered + lambda().await.unwrap_or_else(|e| { + Some((Err(anyhow!("Encountered an error in stream {e:?}")), PostStreamState::End)) + }) } }); diff --git a/modules/client/src/lib.rs b/modules/client/src/lib.rs index df332c120..feac24058 100644 --- a/modules/client/src/lib.rs +++ b/modules/client/src/lib.rs @@ -158,8 +158,8 @@ pub async fn query_response_status( /// calldata: Vec, /// } /// -/// // An error was encountered in the stream, errors are recoverable so it's safe to continue -/// polling. interface Error { +/// // An error was encountered in the stream, the stream will come to an end. +/// interface Error { /// kind: "Error"; /// // error description /// description: string @@ -247,8 +247,8 @@ pub async fn timeout_post_request( /// kind: "Timeout"; /// } /// -/// // An error was encountered in the stream, errors are recoverable so it's safe to continue -/// polling. interface Error { +/// // An error was encountered in the stream, the stream will come to an end. +/// interface Error { /// kind: "Error"; /// // error description /// description: string diff --git a/modules/client/src/types.rs b/modules/client/src/types.rs index b6ed62b31..284fe169c 100644 --- a/modules/client/src/types.rs +++ b/modules/client/src/types.rs @@ -3,7 +3,7 @@ use alloc::collections::BTreeMap; use anyhow::anyhow; use codec::Encode; use core::pin::Pin; -use ethers::types::H160; +use ethers::{types::H160, utils::keccak256}; use futures::Stream; use ismp::{consensus::ConsensusStateId, host::StateMachine}; use serde::{Deserialize, Serialize}; @@ -30,13 +30,7 @@ pub struct KeccakHasher; impl Hasher for KeccakHasher { type Output = H256; fn hash(s: &[u8]) -> Self::Output { - use tiny_keccak::Hasher; - - let mut keccak = tiny_keccak::Keccak::v256(); - let mut output = H256::default(); - keccak.update(s); - keccak.finalize(&mut output[..]); - output + keccak256(s).into() } } diff --git a/modules/client/tests/streams.rs b/modules/client/tests/streams.rs index 5105cbf8b..45d36139c 100644 --- a/modules/client/tests/streams.rs +++ b/modules/client/tests/streams.rs @@ -84,8 +84,8 @@ async fn subscribe_to_request_status() -> Result<(), anyhow::Error> { }; let hyperbrige_config = SubstrateConfig { - // rpc_url: "wss://hyperbridge-gargantua-rpc.blockops.network:443".to_string(), - rpc_url: "ws://127.0.0.1:9944".to_string(), + rpc_url: "wss://hyperbridge-gargantua-rpc.blockops.network:443".to_string(), + // rpc_url: "ws://127.0.0.1:9944".to_string(), consensus_state_id: *b"PARA", hash_algo: HashAlgorithm::Keccak, }; @@ -162,7 +162,8 @@ async fn subscribe_to_request_status() -> Result<(), anyhow::Error> { tracing::info!("Got Status {:?}", status); }, Err(e) => { - tracing::info!("Error: {e:?}") + tracing::info!("Error: {e:?}"); + Err(e)? }, } } @@ -195,8 +196,8 @@ async fn test_timeout_request() -> Result<(), anyhow::Error> { }; let hyperbrige_config = SubstrateConfig { - // rpc_url: "wss://hyperbridge-gargantua-rpc.blockops.network:443".to_string(), - rpc_url: "ws://127.0.0.1:9944".to_string(), + rpc_url: "wss://hyperbridge-gargantua-rpc.blockops.network:443".to_string(), + // rpc_url: "ws://127.0.0.1:9944".to_string(), consensus_state_id: *b"PARA", hash_algo: HashAlgorithm::Keccak, }; diff --git a/modules/ismp/core/src/events.rs b/modules/ismp/core/src/events.rs index 71d616588..e3971578a 100644 --- a/modules/ismp/core/src/events.rs +++ b/modules/ismp/core/src/events.rs @@ -4,7 +4,9 @@ use crate::{ consensus::StateMachineId, router::{Get, Post, PostResponse}, }; +use alloc::vec::Vec; use codec::{Decode, Encode}; +use primitive_types::H256; use scale_info::TypeInfo; /// Emitted when a state machine is successfully updated to a new height after the challenge period @@ -17,6 +19,15 @@ pub struct StateMachineUpdated { pub latest_height: u64, } +/// Emitted when a post request is successfully handled. +#[derive(Clone, Debug, TypeInfo, Encode, Decode, serde::Deserialize, serde::Serialize)] +pub struct PostRequestHandled { + /// The commitment to the request + pub commitment: H256, + /// The address of the relayer responsible for relaying the request + pub relayer: Vec, +} + /// This represents events that should be emitted by ismp-rs wrappers #[derive(Clone, Debug, TypeInfo, Encode, Decode, serde::Deserialize, serde::Serialize)] pub enum Event { @@ -29,4 +40,6 @@ pub enum Event { PostResponse(PostResponse), /// An event that is emitted when a get request is dispatched GetRequest(Get), + /// Emitted when a post request is handled + PostRequestHandled(PostRequestHandled), } diff --git a/modules/ismp/pallet/rpc/src/lib.rs b/modules/ismp/pallet/rpc/src/lib.rs index 3c2ff0f6d..ca3522332 100644 --- a/modules/ismp/pallet/rpc/src/lib.rs +++ b/modules/ismp/pallet/rpc/src/lib.rs @@ -330,10 +330,9 @@ where .map_err(|e| runtime_error_into_rpc_error(e.to_string()))? .ok_or_else(|| runtime_error_into_rpc_error("Invalid block number or hash provided"))?; - let mut api = self.client.runtime_api(); - api.register_extension(OffchainDbExt::new(self.offchain_db.clone())); - while header.number() >= from_block.number() { + let mut api = self.client.runtime_api(); + api.register_extension(OffchainDbExt::new(self.offchain_db.clone())); let at = header.hash(); let mut request_commitments = vec![]; @@ -432,10 +431,9 @@ where .map_err(|e| runtime_error_into_rpc_error(e.to_string()))? .ok_or_else(|| runtime_error_into_rpc_error("Invalid block number or hash provided"))?; - let mut api = self.client.runtime_api(); - api.register_extension(OffchainDbExt::new(self.offchain_db.clone())); - while header.number() >= from_block.number() { + let mut api = self.client.runtime_api(); + api.register_extension(OffchainDbExt::new(self.offchain_db.clone())); let at = header.hash(); let block_events = api.block_events_with_metadata(at).map_err(|e| { diff --git a/parachain/node/Cargo.toml b/parachain/node/Cargo.toml index 996043c62..cb628c9b7 100644 --- a/parachain/node/Cargo.toml +++ b/parachain/node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hyperbridge" -version = "0.3.6" +version = "0.3.7" authors = ["Polytope Labs "] description = "The Hyperbridge coprocessor node" repository = "https://github.com/polytope-labs/hyperbridge"