Skip to content

Commit

Permalink
refactor(input-reader): update state-fold use
Browse files Browse the repository at this point in the history
  • Loading branch information
GMKrieger committed Sep 12, 2023
1 parent 9ed8c8e commit a81e2ba
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 117 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Added authority claimer service to support reader mode
- Added `block-history` and `state-fold` crates to `eth-input-reader`
- Added `SFConfig` and `BHConfig` configuration to `eth-input-reader`
- Added `subscription_depth` configuration to `eth-input-reader`

### Changed

- Renamed `dispatcher` service to `eth-input-reader`

### Removed

- Removed `state-client-lib` crate from `eth-input-reader`
- Removed `SCConfig` configuration from `eth-input-reader`
- Removed claiming functionality from `eth-input-reader`

## [1.0.0] 2023-08-22
Expand Down
22 changes: 3 additions & 19 deletions offchain/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions offchain/eth-input-reader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ async-trait.workspace = true
axum.workspace = true
backoff = { workspace = true, features = ["tokio"] }
clap = { workspace = true, features = ["derive", "env"] }
eth-state-client-lib.workspace = true
eth-state-fold-types = { workspace = true, features = ["ethers"] }
eth-block-history.workspace = true
eth-state-fold.workspace = true
eth-state-fold-types.workspace = true
eth-tx-manager.workspace = true
ethers.workspace = true
ethers-signers = { workspace = true, features = ["aws"] }
futures.workspace = true
hyper.workspace = true
Expand Down
45 changes: 27 additions & 18 deletions offchain/eth-input-reader/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use clap::Parser;
use eth_state_client_lib::config::{
Error as SCError, SCConfig, SCEnvCLIConfig,
};
use eth_block_history::config::{BHConfig, BHEnvCLIConfig};
use eth_state_fold::config::{SFConfig, SFEnvCLIConfig};
use eth_tx_manager::{
config::{Error as TxError, TxEnvCLIConfig, TxManagerConfig},
Priority,
Expand All @@ -23,15 +22,18 @@ use types::deployment_files::{
#[command(name = "rd_config")]
#[command(about = "Configuration for rollups eth-input-reader")]
pub struct EthInputReaderEnvCLIConfig {
#[command(flatten)]
pub sc_config: SCEnvCLIConfig,

#[command(flatten)]
pub tx_config: TxEnvCLIConfig,

#[command(flatten)]
pub broker_config: BrokerCLIConfig,

#[command(flatten)]
pub sf_config: SFEnvCLIConfig,

#[command(flatten)]
pub bh_config: BHEnvCLIConfig,

/// Path to file with deployment json of dapp
#[arg(long, env, default_value = "./dapp_deployment.json")]
pub rd_dapp_deployment_file: PathBuf,
Expand All @@ -43,25 +45,28 @@ pub struct EthInputReaderEnvCLIConfig {
/// Duration of rollups epoch in seconds, for which eth-input-reader will make claims.
#[arg(long, env, default_value = "604800")]
pub rd_epoch_duration: u64,

/// Depth on the blockchain the reader will be listening to
#[arg(long, env)]
pub subscription_depth: Option<usize>,
}

#[derive(Clone, Debug)]
pub struct EthInputReaderConfig {
pub sc_config: SCConfig,
pub tx_config: TxManagerConfig,
pub broker_config: BrokerConfig,
pub sf_config: SFConfig,
pub bh_config: BHConfig,

pub dapp_deployment: DappDeployment,
pub rollups_deployment: RollupsDeployment,
pub epoch_duration: u64,
pub priority: Priority,
pub subscription_depth: usize,
}

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("StateClient configuration error: {}", source))]
StateClientError { source: SCError },

#[snafu(display("TxManager configuration error: {}", source))]
TxManagerError { source: TxError },

Expand All @@ -84,6 +89,8 @@ pub enum Error {
RollupsJsonParseError { source: serde_json::Error },
}

const DEFAULT_SUBSCRIPTION_DEPTH: usize = 7;

#[derive(Debug)]
pub struct Config {
pub eth_input_reader_config: EthInputReaderConfig,
Expand All @@ -97,13 +104,14 @@ impl Config {
"eth_input_reader",
);

let sc_config = SCConfig::initialize(eth_input_reader_config.sc_config)
.context(StateClientSnafu)?;

let tx_config =
TxManagerConfig::initialize(eth_input_reader_config.tx_config)
.context(TxManagerSnafu)?;

let sf_config = SFConfig::initialize(eth_input_reader_config.sf_config);

let bh_config = BHConfig::initialize(eth_input_reader_config.bh_config);

let path = eth_input_reader_config.rd_dapp_deployment_file;
let dapp_deployment: DappDeployment = read_json(path)?;

Expand All @@ -114,19 +122,20 @@ impl Config {
let broker_config =
BrokerConfig::from(eth_input_reader_config.broker_config);

assert!(
sc_config.default_confirmations < tx_config.default_confirmations,
"`state-client confirmations` has to be less than `tx-manager confirmations,`"
);
let subscription_depth = eth_input_reader_config
.subscription_depth
.unwrap_or(DEFAULT_SUBSCRIPTION_DEPTH);

let eth_input_reader_config = EthInputReaderConfig {
sc_config,
tx_config,
broker_config,
sf_config,
bh_config,
dapp_deployment,
rollups_deployment,
epoch_duration: eth_input_reader_config.rd_epoch_duration,
priority: Priority::Normal,
subscription_depth,
};

Ok(Config {
Expand Down
6 changes: 3 additions & 3 deletions offchain/eth-input-reader/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use axum::http::uri::InvalidUri;
use eth_state_client_lib::error::StateServerError;
use snafu::Snafu;
use std::net::AddrParseError;
use tonic::transport::Error as TonicError;
use url::ParseError;

use crate::machine;

Expand All @@ -29,8 +29,8 @@ pub enum EthInputReaderError {
#[snafu(display("connection error"))]
ConnectError { source: TonicError },

#[snafu(display("state server error"))]
StateServerError { source: StateServerError },
#[snafu(display("parser error"))]
ParseError { source: ParseError },

#[snafu(whatever, display("{message}"))]
Whatever {
Expand Down
65 changes: 42 additions & 23 deletions offchain/eth-input-reader/src/eth_input_reader.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use eth_state_client_lib::StateServer;
use eth_state_fold::{Foldable, StateFoldEnvironment};
use eth_state_fold_types::{Block, BlockStreamItem};
use rollups_events::{Address, DAppMetadata};
use std::sync::{Arc, Mutex};
use tokio_stream::StreamExt;
use tracing::{error, info, instrument, trace, warn};
use types::foldables::authority::rollups::{RollupsInitialState, RollupsState};
use types::UserData;

use crate::{
config::EthInputReaderConfig,
error::{BrokerSnafu, EthInputReaderError, StateServerSnafu},
error::{BrokerSnafu, EthInputReaderError},
machine::{
driver::MachineDriver, rollups_broker::BrokerFacade, BrokerReceive,
BrokerSend, Context,
},
metrics::EthInputReaderMetrics,
setup::{create_block_subscription, create_context, create_state_server},
setup::{
create_block_subscriber, create_context, create_env, create_provider,
create_subscription, InputProvider,
},
};

use snafu::{whatever, ResultExt};
Expand All @@ -33,13 +38,17 @@ pub async fn start(
dapp_address: Address::new(config.dapp_deployment.dapp_address.into()),
};

trace!("Creating state-server connection");
let state_server = create_state_server(&config.sc_config).await?;
trace!("Creating provider");
let provider = create_provider(&config).await?;

trace!("Creating block-subscriber");
let block_subscriber =
create_block_subscriber(&config, Arc::clone(&provider)).await?;

trace!("Starting block subscription with confirmations");
let mut block_subscription = create_block_subscription(
&state_server,
config.sc_config.default_confirmations,
let mut block_subscription = create_subscription(
Arc::clone(&block_subscriber),
config.subscription_depth,
)
.await?;

Expand All @@ -49,10 +58,23 @@ pub async fn start(
.await
.context(BrokerSnafu)?;

trace!("Creating env");
let env = create_env(
&config,
Arc::clone(&provider),
Arc::clone(&block_subscriber.block_archive),
)
.await?;

trace!("Creating context");
let mut context =
create_context(&config, &state_server, &broker, dapp_metadata, metrics)
.await?;
let mut context = create_context(
&config,
Arc::clone(&block_subscriber),
&broker,
dapp_metadata,
metrics,
)
.await?;

trace!("Creating machine driver and blockchain driver");
let mut machine_driver =
Expand All @@ -75,8 +97,8 @@ pub async fn start(
b.parent_hash
);
process_block(
&Arc::clone(&env),
&b,
&state_server,
&initial_state,
&mut context,
&mut machine_driver,
Expand Down Expand Up @@ -114,24 +136,21 @@ pub async fn start(
#[instrument(level = "trace", skip_all)]
#[allow(clippy::too_many_arguments)]
async fn process_block(
env: &Arc<StateFoldEnvironment<InputProvider, Mutex<UserData>>>,
block: &Block,

state_server: &impl StateServer<
InitialState = RollupsInitialState,
State = RollupsState,
>,
initial_state: &RollupsInitialState,

context: &mut Context,
machine_driver: &mut MachineDriver,

broker: &(impl BrokerSend + BrokerReceive),
) -> Result<(), EthInputReaderError> {
trace!("Querying rollup state");
let state = state_server
.query_state(initial_state, block.hash)
.await
.context(StateServerSnafu)?;
let state = RollupsState::get_state_for_block(
&Arc::new(initial_state.to_owned()),
block,
env,
)
.await
.expect("should get state");

// Drive machine
trace!("Reacting to state with `machine_driver`");
Expand Down
3 changes: 2 additions & 1 deletion offchain/eth-input-reader/src/machine/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use super::Context;

use super::{rollups_broker::BrokerFacadeError, BrokerSend};

use eth_state_fold_types::{ethereum_types::Address, Block};
use eth_state_fold_types::Block;
use ethers::types::Address;
use types::foldables::input_box::{DAppInputBox, Input, InputBox};

use tracing::{debug, instrument, trace};
Expand Down
Loading

0 comments on commit a81e2ba

Please sign in to comment.