From 1da3f7ea1df94312e7c6818c17bf4109f888e547 Mon Sep 17 00:00:00 2001 From: perekopskiy <53865202+perekopskiy@users.noreply.github.com> Date: Thu, 5 Sep 2024 18:02:27 +0300 Subject: [PATCH] feat(eth-watch): do not query events from earliest block (#2810) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Removes querying from the earliest batch in eth watch. Instead queries for constant block range and splits queried range in parts if needed ## Why ❔ Vanilla reth doesn't allow eth_logs requests where block range is greater than 1_000_000. This changes allows eth watch to work with this limitation. ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --- core/node/eth_watch/src/client.rs | 139 +++++++++++++++++------------- 1 file changed, 78 insertions(+), 61 deletions(-) diff --git a/core/node/eth_watch/src/client.rs b/core/node/eth_watch/src/client.rs index 8d465109994..67e603041e6 100644 --- a/core/node/eth_watch/src/client.rs +++ b/core/node/eth_watch/src/client.rs @@ -88,75 +88,34 @@ impl EthHttpQueryClient { } } - async fn get_filter_logs( + fn get_default_address_list(&self) -> Vec
{ + [ + Some(self.diamond_proxy_addr), + Some(self.governance_address), + self.state_transition_manager_address, + self.chain_admin_address, + ] + .into_iter() + .flatten() + .collect() + } + + async fn get_events_inner( &self, from: BlockNumber, to: BlockNumber, - topics: Vec, + topics1: Vec, + topics2: Vec, + addresses: Vec
, + retries_left: usize, ) -> EnrichedClientResult> { let filter = FilterBuilder::default() - .address( - [ - Some(self.diamond_proxy_addr), - Some(self.governance_address), - self.state_transition_manager_address, - self.chain_admin_address, - ] - .into_iter() - .flatten() - .collect(), - ) .from_block(from) .to_block(to) - .topics(Some(topics), None, None, None) + .topics(Some(topics1), Some(topics2), None, None) + .address(addresses) .build(); - self.client.logs(&filter).await - } -} - -#[async_trait::async_trait] -impl EthClient for EthHttpQueryClient { - async fn scheduler_vk_hash( - &self, - verifier_address: Address, - ) -> Result { - // New verifier returns the hash of the verification key. - CallFunctionArgs::new("verificationKeyHash", ()) - .for_contract(verifier_address, &self.verifier_contract_abi) - .call(&self.client) - .await - } - - async fn diamond_cut_by_version( - &self, - packed_version: H256, - ) -> EnrichedClientResult>> { - let Some(state_transition_manager_address) = self.state_transition_manager_address else { - return Ok(None); - }; - - let filter = FilterBuilder::default() - .address(vec![state_transition_manager_address]) - .from_block(BlockNumber::Earliest) - .to_block(BlockNumber::Latest) - .topics( - Some(vec![self.new_upgrade_cut_data_signature]), - Some(vec![packed_version]), - None, - None, - ) - .build(); - let logs = self.client.logs(&filter).await?; - Ok(logs.into_iter().next().map(|log| log.data.0)) - } - - async fn get_events( - &self, - from: BlockNumber, - to: BlockNumber, - retries_left: usize, - ) -> EnrichedClientResult> { - let mut result = self.get_filter_logs(from, to, self.topics.clone()).await; + let mut result = self.client.logs(&filter).await; // This code is compatible with both Infura and Alchemy API providers. // Note: we don't handle rate-limits here - assumption is that we're never going to hit them. @@ -225,6 +184,64 @@ impl EthClient for EthHttpQueryClient { result } +} + +#[async_trait::async_trait] +impl EthClient for EthHttpQueryClient { + async fn scheduler_vk_hash( + &self, + verifier_address: Address, + ) -> Result { + // New verifier returns the hash of the verification key. + CallFunctionArgs::new("verificationKeyHash", ()) + .for_contract(verifier_address, &self.verifier_contract_abi) + .call(&self.client) + .await + } + + async fn diamond_cut_by_version( + &self, + packed_version: H256, + ) -> EnrichedClientResult>> { + const LOOK_BACK_BLOCK_RANGE: u64 = 1_000_000; + + let Some(state_transition_manager_address) = self.state_transition_manager_address else { + return Ok(None); + }; + + let to_block = self.client.block_number().await?; + let from_block = to_block.saturating_sub((LOOK_BACK_BLOCK_RANGE - 1).into()); + + let logs = self + .get_events_inner( + from_block.into(), + to_block.into(), + vec![self.new_upgrade_cut_data_signature], + vec![packed_version], + vec![state_transition_manager_address], + RETRY_LIMIT, + ) + .await?; + + Ok(logs.into_iter().next().map(|log| log.data.0)) + } + + async fn get_events( + &self, + from: BlockNumber, + to: BlockNumber, + retries_left: usize, + ) -> EnrichedClientResult> { + self.get_events_inner( + from, + to, + self.topics.clone(), + Vec::new(), + self.get_default_address_list(), + retries_left, + ) + .await + } async fn finalized_block_number(&self) -> EnrichedClientResult { if let Some(confirmations) = self.confirmations_for_eth_event {