Skip to content

Commit

Permalink
fix: high RPC usage in dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
gligneul committed Jan 24, 2024
1 parent 2d96001 commit 66025f7
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Removed concurrent block fetch in foldable `InputBox`.
- Removed snapshot-saving feature. Now, the node will always start from the beginning.

## Fixed

- Fixed high RPC usage by filtering the input added event by the application address.

## [1.2.0]

### Added
Expand Down
8 changes: 8 additions & 0 deletions offchain/dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ pub async fn start(
);

let initial_state = InputBoxInitialState {
dapp_address: Arc::new(
config
.blockchain_config
.dapp_address
.clone()
.into_inner()
.into(),
),
input_box_address: Arc::new(
config
.blockchain_config
Expand Down
2 changes: 2 additions & 0 deletions offchain/dispatcher/src/drivers/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub fn new_input(timestamp: u32) -> Input {

pub fn new_input_box() -> InputBox {
InputBox {
dapp_address: Arc::new(H160::random()),
input_box_address: Arc::new(H160::random()),
dapp_input_boxes: Arc::new(hashmap! {}),
}
Expand All @@ -65,6 +66,7 @@ pub fn update_input_box(
.dapp_input_boxes
.update(Arc::new(dapp_address), Arc::new(DAppInputBox { inputs }));
InputBox {
dapp_address: Arc::new(dapp_address),
input_box_address: input_box.input_box_address,
dapp_input_boxes: Arc::new(dapp_input_boxes),
}
Expand Down
29 changes: 24 additions & 5 deletions offchain/types/src/foldables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use eth_state_fold_types::{
contract::LogMeta,
prelude::EthEvent,
providers::Middleware,
types::{Address, TxHash},
types::{Address, TxHash, ValueOrArray, H160},
},
Block,
};
Expand All @@ -25,6 +25,7 @@ use std::sync::{Arc, Mutex};

#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct InputBoxInitialState {
pub dapp_address: Arc<Address>,
pub input_box_address: Arc<Address>,
}

Expand All @@ -44,6 +45,7 @@ pub struct DAppInputBox {

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InputBox {
pub dapp_address: Arc<Address>,
pub input_box_address: Arc<Address>,
pub dapp_input_boxes: Arc<HashMap<Arc<Address>, Arc<DAppInputBox>>>,
}
Expand All @@ -60,6 +62,7 @@ impl Foldable for InputBox {
env: &StateFoldEnvironment<M, Self::UserData>,
access: Arc<SyncMiddleware<M>>,
) -> Result<Self, Self::Error> {
let dapp_address = Arc::clone(&initial_state.dapp_address);
let input_box_address = Arc::clone(&initial_state.input_box_address);

Ok(Self {
Expand All @@ -68,9 +71,11 @@ impl Foldable for InputBox {
access,
env,
&input_box_address,
&dapp_address,
None,
)
.await?,
dapp_address,
input_box_address,
})
}
Expand All @@ -82,6 +87,7 @@ impl Foldable for InputBox {
env: &StateFoldEnvironment<M, Self::UserData>,
access: Arc<FoldMiddleware<M>>,
) -> Result<Self, Self::Error> {
let dapp_address = Arc::clone(&previous_state.dapp_address);
let input_box_address = Arc::clone(&previous_state.input_box_address);

if !(fold_utils::contains_address(
Expand All @@ -100,10 +106,11 @@ impl Foldable for InputBox {
access,
env,
&input_box_address,
&dapp_address,
None,
)
.await?,

dapp_address,
input_box_address,
})
}
Expand All @@ -114,14 +121,20 @@ async fn updated_inputs<M1: Middleware + 'static, M2: Middleware + 'static>(
provider: Arc<M1>,
env: &StateFoldEnvironment<M2, <InputBox as Foldable>::UserData>,
contract_address: &Address,
dapp_address: &Address,
block_opt: Option<Block>, // TODO: Option<Arc<Block>>,
) -> Result<Arc<HashMap<Arc<Address>, Arc<DAppInputBox>>>, FoldableError> {
let mut input_boxes =
previous_input_boxes.cloned().unwrap_or(HashMap::new());

let new_inputs =
fetch_all_new_inputs(provider, env, contract_address, block_opt)
.await?;
let new_inputs = fetch_all_new_inputs(
provider,
env,
contract_address,
dapp_address,
block_opt,
)
.await?;

for input in new_inputs {
let dapp = input.dapp.clone();
Expand Down Expand Up @@ -151,14 +164,20 @@ async fn fetch_all_new_inputs<
provider: Arc<M1>,
env: &StateFoldEnvironment<M2, <InputBox as Foldable>::UserData>,
contract_address: &Address,
dapp_address: &Address,
block_opt: Option<Block>, // TODO: Option<Arc<Block>>,
) -> Result<Vec<Input>, FoldableError> {
use contracts::input_box::*;
let contract = InputBox::new(*contract_address, Arc::clone(&provider));

// The application address is the first indexed parameter of the InputAdded event
let dapp_address = H160(dapp_address.to_owned().into());
let dapp_address_index = ValueOrArray::Value(Some(dapp_address.into()));

// Retrieve `InputAdded` events
let input_events = contract
.input_added_filter()
.topic1(dapp_address_index)
.query_with_meta()
.await
.context("Error querying for input added events")?;
Expand Down

0 comments on commit 66025f7

Please sign in to comment.