Skip to content

Commit

Permalink
fix: high RPC usage in dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
gligneul committed Jan 29, 2024
1 parent e5adb9f commit 4252bef
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Removed concurrent block fetch in foldable `InputBox`.
- Removed snapshot-saving feature. Now, the node will always start from the beginning.

## Fixed

- Fixed high RPC usage by filtering the input added event by the application address.

## [1.2.0]

### Added
Expand Down
8 changes: 8 additions & 0 deletions offchain/dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ pub async fn start(
);

let initial_state = InputBoxInitialState {
dapp_address: Arc::new(
config
.blockchain_config
.dapp_address
.clone()
.into_inner()
.into(),
),
input_box_address: Arc::new(
config
.blockchain_config
Expand Down
2 changes: 2 additions & 0 deletions offchain/dispatcher/src/drivers/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub fn new_input(timestamp: u32) -> Input {

pub fn new_input_box() -> InputBox {
InputBox {
dapp_address: Arc::new(H160::random()),
input_box_address: Arc::new(H160::random()),
dapp_input_boxes: Arc::new(hashmap! {}),
}
Expand All @@ -65,6 +66,7 @@ pub fn update_input_box(
.dapp_input_boxes
.update(Arc::new(dapp_address), Arc::new(DAppInputBox { inputs }));
InputBox {
dapp_address: Arc::new(dapp_address),
input_box_address: input_box.input_box_address,
dapp_input_boxes: Arc::new(dapp_input_boxes),
}
Expand Down
37 changes: 26 additions & 11 deletions offchain/types/src/foldables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::{Arc, Mutex};

#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct InputBoxInitialState {
pub dapp_address: Arc<Address>,
pub input_box_address: Arc<Address>,
}

Expand All @@ -44,6 +45,7 @@ pub struct DAppInputBox {

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InputBox {
pub dapp_address: Arc<Address>,
pub input_box_address: Arc<Address>,
pub dapp_input_boxes: Arc<HashMap<Arc<Address>, Arc<DAppInputBox>>>,
}
Expand All @@ -60,6 +62,7 @@ impl Foldable for InputBox {
env: &StateFoldEnvironment<M, Self::UserData>,
access: Arc<SyncMiddleware<M>>,
) -> Result<Self, Self::Error> {
let dapp_address = Arc::clone(&initial_state.dapp_address);
let input_box_address = Arc::clone(&initial_state.input_box_address);

Ok(Self {
Expand All @@ -68,9 +71,11 @@ impl Foldable for InputBox {
access,
env,
&input_box_address,
&dapp_address,
None,
)
.await?,
dapp_address,
input_box_address,
})
}
Expand All @@ -82,15 +87,16 @@ impl Foldable for InputBox {
env: &StateFoldEnvironment<M, Self::UserData>,
access: Arc<FoldMiddleware<M>>,
) -> Result<Self, Self::Error> {
let dapp_address = Arc::clone(&previous_state.dapp_address);
let input_box_address = Arc::clone(&previous_state.input_box_address);

if !(fold_utils::contains_address(
&block.logs_bloom,
&input_box_address,
) && (fold_utils::contains_topic(
&block.logs_bloom,
&contracts::input_box::InputAddedFilter::signature(),
))) {
if !fold_utils::contains_address(&block.logs_bloom, &input_box_address)
|| !fold_utils::contains_topic(&block.logs_bloom, &*dapp_address)
|| !fold_utils::contains_topic(
&block.logs_bloom,
&contracts::input_box::InputAddedFilter::signature(),
)
{
return Ok(previous_state.clone());
}

Expand All @@ -100,10 +106,11 @@ impl Foldable for InputBox {
access,
env,
&input_box_address,
&dapp_address,
None,
)
.await?,

dapp_address,
input_box_address,
})
}
Expand All @@ -114,14 +121,20 @@ async fn updated_inputs<M1: Middleware + 'static, M2: Middleware + 'static>(
provider: Arc<M1>,
env: &StateFoldEnvironment<M2, <InputBox as Foldable>::UserData>,
contract_address: &Address,
dapp_address: &Address,
block_opt: Option<Block>, // TODO: Option<Arc<Block>>,
) -> Result<Arc<HashMap<Arc<Address>, Arc<DAppInputBox>>>, FoldableError> {
let mut input_boxes =
previous_input_boxes.cloned().unwrap_or(HashMap::new());

let new_inputs =
fetch_all_new_inputs(provider, env, contract_address, block_opt)
.await?;
let new_inputs = fetch_all_new_inputs(
provider,
env,
contract_address,
dapp_address,
block_opt,
)
.await?;

for input in new_inputs {
let dapp = input.dapp.clone();
Expand Down Expand Up @@ -151,6 +164,7 @@ async fn fetch_all_new_inputs<
provider: Arc<M1>,
env: &StateFoldEnvironment<M2, <InputBox as Foldable>::UserData>,
contract_address: &Address,
dapp_address: &Address,
block_opt: Option<Block>, // TODO: Option<Arc<Block>>,
) -> Result<Vec<Input>, FoldableError> {
use contracts::input_box::*;
Expand All @@ -159,6 +173,7 @@ async fn fetch_all_new_inputs<
// Retrieve `InputAdded` events
let input_events = contract
.input_added_filter()
.topic1(*dapp_address)
.query_with_meta()
.await
.context("Error querying for input added events")?;
Expand Down

0 comments on commit 4252bef

Please sign in to comment.