From 4252bef7fabc7b7e3bd548653a54a1adcb2e41ca Mon Sep 17 00:00:00 2001 From: Gabriel de Quadros Ligneul Date: Wed, 24 Jan 2024 18:08:55 -0300 Subject: [PATCH] fix: high RPC usage in dispatcher --- CHANGELOG.md | 4 +++ offchain/dispatcher/src/dispatcher.rs | 8 ++++++ offchain/dispatcher/src/drivers/mock.rs | 2 ++ offchain/types/src/foldables.rs | 37 +++++++++++++++++-------- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9943fef27..46fb24340 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Removed concurrent block fetch in foldable `InputBox`. - Removed snapshot-saving feature. Now, the node will always start from the beginning. +## Fixed + +- Fixed high RPC usage by filtering the input added event by the application address. + ## [1.2.0] ### Added diff --git a/offchain/dispatcher/src/dispatcher.rs b/offchain/dispatcher/src/dispatcher.rs index a1e00640d..da5693ae8 100644 --- a/offchain/dispatcher/src/dispatcher.rs +++ b/offchain/dispatcher/src/dispatcher.rs @@ -59,6 +59,14 @@ pub async fn start( ); let initial_state = InputBoxInitialState { + dapp_address: Arc::new( + config + .blockchain_config + .dapp_address + .clone() + .into_inner() + .into(), + ), input_box_address: Arc::new( config .blockchain_config diff --git a/offchain/dispatcher/src/drivers/mock.rs b/offchain/dispatcher/src/drivers/mock.rs index 29b832008..d56acd616 100644 --- a/offchain/dispatcher/src/drivers/mock.rs +++ b/offchain/dispatcher/src/drivers/mock.rs @@ -46,6 +46,7 @@ pub fn new_input(timestamp: u32) -> Input { pub fn new_input_box() -> InputBox { InputBox { + dapp_address: Arc::new(H160::random()), input_box_address: Arc::new(H160::random()), dapp_input_boxes: Arc::new(hashmap! {}), } @@ -65,6 +66,7 @@ pub fn update_input_box( .dapp_input_boxes .update(Arc::new(dapp_address), Arc::new(DAppInputBox { inputs })); InputBox { + dapp_address: Arc::new(dapp_address), input_box_address: input_box.input_box_address, dapp_input_boxes: Arc::new(dapp_input_boxes), } diff --git a/offchain/types/src/foldables.rs b/offchain/types/src/foldables.rs index 74813f269..71d7cb3e1 100644 --- a/offchain/types/src/foldables.rs +++ b/offchain/types/src/foldables.rs @@ -25,6 +25,7 @@ use std::sync::{Arc, Mutex}; #[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct InputBoxInitialState { + pub dapp_address: Arc
, pub input_box_address: Arc
, } @@ -44,6 +45,7 @@ pub struct DAppInputBox { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct InputBox { + pub dapp_address: Arc
, pub input_box_address: Arc
, pub dapp_input_boxes: Arc, Arc>>, } @@ -60,6 +62,7 @@ impl Foldable for InputBox { env: &StateFoldEnvironment, access: Arc>, ) -> Result { + let dapp_address = Arc::clone(&initial_state.dapp_address); let input_box_address = Arc::clone(&initial_state.input_box_address); Ok(Self { @@ -68,9 +71,11 @@ impl Foldable for InputBox { access, env, &input_box_address, + &dapp_address, None, ) .await?, + dapp_address, input_box_address, }) } @@ -82,15 +87,16 @@ impl Foldable for InputBox { env: &StateFoldEnvironment, access: Arc>, ) -> Result { + let dapp_address = Arc::clone(&previous_state.dapp_address); let input_box_address = Arc::clone(&previous_state.input_box_address); - if !(fold_utils::contains_address( - &block.logs_bloom, - &input_box_address, - ) && (fold_utils::contains_topic( - &block.logs_bloom, - &contracts::input_box::InputAddedFilter::signature(), - ))) { + if !fold_utils::contains_address(&block.logs_bloom, &input_box_address) + || !fold_utils::contains_topic(&block.logs_bloom, &*dapp_address) + || !fold_utils::contains_topic( + &block.logs_bloom, + &contracts::input_box::InputAddedFilter::signature(), + ) + { return Ok(previous_state.clone()); } @@ -100,10 +106,11 @@ impl Foldable for InputBox { access, env, &input_box_address, + &dapp_address, None, ) .await?, - + dapp_address, input_box_address, }) } @@ -114,14 +121,20 @@ async fn updated_inputs( provider: Arc, env: &StateFoldEnvironment::UserData>, contract_address: &Address, + dapp_address: &Address, block_opt: Option, // TODO: Option>, ) -> Result, Arc>>, FoldableError> { let mut input_boxes = previous_input_boxes.cloned().unwrap_or(HashMap::new()); - let new_inputs = - fetch_all_new_inputs(provider, env, contract_address, block_opt) - .await?; + let new_inputs = fetch_all_new_inputs( + provider, + env, + contract_address, + dapp_address, + block_opt, + ) + .await?; for input in new_inputs { let dapp = input.dapp.clone(); @@ -151,6 +164,7 @@ async fn fetch_all_new_inputs< provider: Arc, env: &StateFoldEnvironment::UserData>, contract_address: &Address, + dapp_address: &Address, block_opt: Option, // TODO: Option>, ) -> Result, FoldableError> { use contracts::input_box::*; @@ -159,6 +173,7 @@ async fn fetch_all_new_inputs< // Retrieve `InputAdded` events let input_events = contract .input_added_filter() + .topic1(*dapp_address) .query_with_meta() .await .context("Error querying for input added events")?;