Skip to content

Commit

Permalink
Split messages into batches (#34)
Browse files Browse the repository at this point in the history
* chunk transactions

* fix

* fix previous height

* polkadot update

* lock file update

* fix beefy sync

* fix base case in beefy sync

* fix beefy sync

* update runtime types

* fix generated code

* update ping module address

* update abi

---------

Co-authored-by: Seun Lanlege <[email protected]>
  • Loading branch information
Wizdave97 and seunlanlege authored Oct 23, 2023
1 parent 83c239e commit 931f71d
Show file tree
Hide file tree
Showing 19 changed files with 1,743 additions and 1,721 deletions.
577 changes: 289 additions & 288 deletions Cargo.lock

Large diffs are not rendered by default.

2,242 changes: 1,113 additions & 1,129 deletions ethereum/evm/abis/PingModule.json

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions ethereum/evm/src/abi/ping_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ pub mod ping_module {
kind: ::ethers::core::abi::ethabi::ParamType::Tuple(::std::vec![
::ethers::core::abi::ethabi::ParamType::Bytes,
::ethers::core::abi::ethabi::ParamType::Address,
::ethers::core::abi::ethabi::ParamType::Uint(64usize),
],),
internal_type: ::core::option::Option::Some(
::std::borrow::ToOwned::to_owned("struct PingMessage"),
Expand Down Expand Up @@ -453,10 +454,10 @@ pub mod ping_module {
.method_hash([199, 21, 245, 43], (request,))
.expect("method not found (this should never happen)")
}
///Calls the contract's `ping` (0xe23c69b1) function
///Calls the contract's `ping` (0x40ffb7bc) function
pub fn ping(&self, msg: PingMessage) -> ::ethers::contract::builders::ContractCall<M, ()> {
self.0
.method_hash([226, 60, 105, 177], (msg,))
.method_hash([64, 255, 183, 188], (msg,))
.expect("method not found (this should never happen)")
}
///Gets the contract's `GetResponseReceived` event
Expand Down Expand Up @@ -915,7 +916,7 @@ pub mod ping_module {
pub request: PostRequest,
}
///Container type for all input parameters for the `ping` function with signature
/// `ping((bytes,address))` and selector `0xe23c69b1`
/// `ping((bytes,address,uint64))` and selector `0x40ffb7bc`
#[derive(
Clone,
::ethers::contract::EthCall,
Expand All @@ -926,7 +927,7 @@ pub mod ping_module {
Eq,
Hash,
)]
#[ethcall(name = "ping", abi = "ping((bytes,address))")]
#[ethcall(name = "ping", abi = "ping((bytes,address,uint64))")]
pub struct PingCall {
pub msg: PingMessage,
}
Expand Down Expand Up @@ -1091,7 +1092,7 @@ pub mod ping_module {
Hash,
)]
pub struct DispatchWithRequestReturn(pub [u8; 32]);
///`PingMessage(bytes,address)`
///`PingMessage(bytes,address,uint64)`
#[derive(
Clone,
::ethers::contract::EthAbiType,
Expand All @@ -1105,5 +1106,6 @@ pub mod ping_module {
pub struct PingMessage {
pub dest: ::ethers::core::types::Bytes,
pub module: ::ethers::core::types::Address,
pub timeout: u64,
}
}
5 changes: 1 addition & 4 deletions ethereum/evm/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::EvmClient;
use std::sync::Arc;
use tesseract_primitives::{
BoxStream, ByzantineHandler, ChallengePeriodStarted, IsmpHost, IsmpProvider, Reconnect,
};
Expand Down Expand Up @@ -50,12 +49,10 @@ where
let nonce_provider = self.nonce_provider.clone();
self.host.reconnect(counterparty).await?;
let host = self.host.clone();
let latest_height = *self.latest_height.lock();
let mut new_client = EvmClient::new(host, self.config.clone(), counterparty).await?;
if let Some(nonce_provider) = nonce_provider {
new_client.set_nonce_provider(nonce_provider);
}
new_client.set_latest_height(latest_height);
*self = new_client;
Ok(())
}
Expand All @@ -69,7 +66,7 @@ impl<T: IsmpHost + Clone> Clone for EvmClient<T> {
signer: self.signer.clone(),
consensus_state_id: self.consensus_state_id,
state_machine: self.state_machine,
latest_height: Arc::clone(&self.latest_height),
initial_height: self.initial_height,
ismp_host: self.ismp_host,
handler: self.handler,
nonce_provider: self.nonce_provider.clone(),
Expand Down
13 changes: 4 additions & 9 deletions ethereum/evm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use ismp::{
host::{Ethereum, StateMachine},
};
use jsonrpsee::ws_client::WsClientBuilder;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use sp_core::{bytes::from_hex, keccak_256, Pair, H160};
use std::sync::Arc;
Expand Down Expand Up @@ -89,7 +88,7 @@ pub struct EvmClient<I> {
/// State machine Identifier for this client.
state_machine: StateMachine,
/// Latest state machine height.
latest_height: Arc<Mutex<u64>>,
initial_height: u64,
/// Ismp Host contract address
ismp_host: H160,
/// Ismp Handler contract address
Expand Down Expand Up @@ -151,7 +150,7 @@ where
signer,
consensus_state_id,
state_machine: config.state_machine,
latest_height: Arc::new(Mutex::new(latest_height)),
initial_height: latest_height,
ismp_host: config.ismp_host,
handler: config.handler,
gas_limit: config.gas_limit,
Expand Down Expand Up @@ -184,7 +183,7 @@ where

// let gas = call.estimate_gas().await?; // todo: fix estimate gas
// dbg!(gas);
call.gas(10_000_000).send().await?.await?;
call.nonce(self.get_nonce().await?).gas(10_000_000).send().await?.await?;

Ok(())
}
Expand All @@ -200,7 +199,7 @@ where

// let gas = call.estimate_gas().await?; // todo: fix estimate gas
// dbg!(gas);
call.gas(10_000_000).send().await?.await?;
call.nonce(self.get_nonce().await?).gas(10_000_000).send().await?.await?;

Ok(())
}
Expand All @@ -220,10 +219,6 @@ where
derive_map_key(key.0.to_vec(), REQUEST_RECEIPTS_SLOT)
}

pub fn set_latest_height(&mut self, height: u64) {
self.latest_height = Arc::new(Mutex::new(height))
}

pub fn set_nonce_provider(&mut self, nonce_provider: NonceProvider) {
self.nonce_provider = Some(nonce_provider);
}
Expand Down
11 changes: 6 additions & 5 deletions ethereum/evm/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,35 +215,35 @@ mod tests {
use std::sync::Arc;

#[tokio::test]
#[ignore]
async fn test_ping() -> anyhow::Result<()> {
dotenv::dotenv().ok();
let op_url = std::env::var("OP_URL").expect("OP_URL must be set.");
let base_url = std::env::var("BASE_URL").expect("OP_URL must be set.");
let arb_url = std::env::var("ARB_URL").expect("OP_URL must be set.");
let geth_url = std::env::var("GETH_URL").expect("OP_URL must be set.");

let chains = vec![
(
StateMachine::Ethereum(Ethereum::ExecutionLayer),
H160(hex!("53920d815e1518eebDa3c09D614A6ce59d9fb4B0")),
H160(hex!("be094ba30775301FDc5ABE6095e1457073825b40")),
geth_url,
5u64,
),
(
StateMachine::Ethereum(Ethereum::Arbitrum),
H160(hex!("4E97A39f8Be6b568Df76dc7e9B141e53c1e519EF")),
H160(hex!("2Fc23c39Bd341ba467349725e6ab61B2DA9D49c1")),
arb_url,
421613,
),
(
StateMachine::Ethereum(Ethereum::Optimism),
H160(hex!("617Ba1259FDFAc28c2B192B50057f3D62FeCB33b")),
H160(hex!("aA505C51C975ee19c5A2BB080245c20CCE6D3E51")),
op_url,
420,
),
(
StateMachine::Ethereum(Ethereum::Base),
H160(hex!("F1a722eC517e5F4dCb78ef09908efb52dB6D6180")),
H160(hex!("02b20A2db3c97203Da489a53ed3316D37389a779")),
base_url,
84531,
),
Expand All @@ -266,6 +266,7 @@ mod tests {
.ping(PingMessage {
dest: chain.to_string().as_bytes().to_vec().into(),
module: address.clone().into(),
timeout: 10 * 60 * 60,
})
.gas(10_000_000)
.send()
Expand Down
20 changes: 15 additions & 5 deletions ethereum/evm/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,17 @@ where
Ok(state_proof.encode())
}

async fn query_ismp_events(&self, event: StateMachineUpdated) -> Result<Vec<Event>, Error> {
let latest_height = Arc::clone(&self.latest_height);
let previous_height = *latest_height.lock() + 1;
let events = self.events(previous_height, event.latest_height).await?;
*latest_height.lock() = event.latest_height;
async fn query_ismp_events(
&self,
previous_height: u64,
event: StateMachineUpdated,
) -> Result<Vec<Event>, Error> {
let range = (previous_height + 1)..=event.latest_height;
if range.is_empty() {
return Ok(Default::default())
}
let events = self.events(previous_height + 1, event.latest_height).await?;
log::info!("querying: {range:?}");
Ok(events)
}

Expand All @@ -212,6 +218,10 @@ where
self.gas_limit
}

fn initial_height(&self) -> u64 {
self.initial_height
}

async fn estimate_gas(&self, _msg: Vec<Message>) -> Result<u64, Error> {
todo!()
}
Expand Down
5 changes: 5 additions & 0 deletions ethereum/sync-committee/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl IsmpProvider for MockHost {

async fn query_ismp_events(
&self,
_previous_height: u64,
_event: StateMachineUpdated,
) -> Result<Vec<Event>, anyhow::Error> {
todo!()
Expand All @@ -149,6 +150,10 @@ impl IsmpProvider for MockHost {
todo!()
}

fn initial_height(&self) -> u64 {
0
}

async fn estimate_gas(&self, _msg: Vec<Message>) -> Result<u64, anyhow::Error> {
todo!()
}
Expand Down
4 changes: 2 additions & 2 deletions ethereum/sync-committee/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async fn check_consensus_notification() -> anyhow::Result<()> {
for (state_machine_id, l2_oracle) in op_stack {
println!("Verifying {state_machine_id:?} payload proof");
if let Some(payload) = op_stack_payload.remove(&state_machine_id) {
let state = verify_optimism_payload::<Host>(
let _state = verify_optimism_payload::<Host>(
payload,
&state_root[..],
l2_oracle.into(),
Expand All @@ -178,7 +178,7 @@ async fn check_consensus_notification() -> anyhow::Result<()> {

if let Some(arbitrum_payload) = arbitrum_payload {
println!("Verifying arbitrum payload proof");
let state = verify_arbitrum_payload::<Host>(
let _state = verify_arbitrum_payload::<Host>(
arbitrum_payload,
&state_root[..],
hex_literal::hex!("45e5cAea8768F42B385A366D3551Ad1e0cbFAb17").into(),
Expand Down
49 changes: 33 additions & 16 deletions messaging/src/event_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,19 @@ where
let mut get_responses = vec![];

if !post_request_queries.is_empty() {
let requests_proof = source
.query_requests_proof(state_machine_height.height, post_request_queries)
.await?;
let msg = RequestMessage {
requests: post_requests,
proof: Proof { height: state_machine_height, proof: requests_proof },
};
messages.push(Message::Request(msg));
let chunks = chunk_size(sink.state_machine_id().state_id);
let query_chunks = post_request_queries.chunks(chunks);
let post_request_chunks = post_requests.chunks(chunks);
for (queries, post_requests) in query_chunks.into_iter().zip(post_request_chunks) {
let requests_proof = source
.query_requests_proof(state_machine_height.height, queries.to_vec())
.await?;
let msg = RequestMessage {
requests: post_requests.to_vec(),
proof: Proof { height: state_machine_height, proof: requests_proof },
};
messages.push(Message::Request(msg));
}
}

// Let's handle get requests
Expand All @@ -143,14 +148,19 @@ where
}

if !response_queries.is_empty() {
let responses_proof = source
.query_responses_proof(state_machine_height.height, response_queries)
.await?;
let msg = ResponseMessage::Post {
responses: post_responses,
proof: Proof { height: state_machine_height, proof: responses_proof },
};
messages.push(Message::Response(msg));
let chunks = chunk_size(sink.state_machine_id().state_id);
let query_chunks = response_queries.chunks(chunks);
let post_request_chunks = post_responses.chunks(chunks);
for (queries, post_responses) in query_chunks.into_iter().zip(post_request_chunks) {
let responses_proof = source
.query_responses_proof(state_machine_height.height, queries.to_vec())
.await?;
let msg = ResponseMessage::Post {
responses: post_responses.to_vec(),
proof: Proof { height: state_machine_height, proof: responses_proof },
};
messages.push(Message::Response(msg));
}
}

Ok((messages, get_responses))
Expand All @@ -172,6 +182,13 @@ pub fn filter_events(
}
}

fn chunk_size(state_machine: StateMachine) -> usize {
match state_machine {
StateMachine::Ethereum(_) => 100,
_ => 200,
}
}

pub struct Hasher;

impl Keccak256 for Hasher {
Expand Down
Loading

0 comments on commit 931f71d

Please sign in to comment.