From a39bab3bc063ce93e45ab3b19ae1f9bef04a8903 Mon Sep 17 00:00:00 2001 From: Maximo Palopoli <96491141+maximopalopoli@users.noreply.github.com> Date: Tue, 29 Oct 2024 15:25:40 -0300 Subject: [PATCH 1/3] feat(levm): add contract creation (#990) **Motivation** Creation transaction is not supported, and it's essential to execute the EVM. **Description** Some functionality has been taken from `create()` method of VM. Maybe a refactor can be done in that sense. In case of error I used the `VMerror` enum, but a future implementation may use other structure --- crates/vm/levm/.gitignore | 2 + crates/vm/levm/src/constants.rs | 2 + crates/vm/levm/src/errors.rs | 6 +- crates/vm/levm/src/utils.rs | 9 +- crates/vm/levm/src/vm.rs | 217 +++++++++++++++++++++++++++++--- crates/vm/levm/tests/tests.rs | 72 +++++++---- 6 files changed, 263 insertions(+), 45 deletions(-) diff --git a/crates/vm/levm/.gitignore b/crates/vm/levm/.gitignore index 335ec9573..adb63666b 100644 --- a/crates/vm/levm/.gitignore +++ b/crates/vm/levm/.gitignore @@ -1 +1,3 @@ *.tar.gz + +tests/ef_testcases diff --git a/crates/vm/levm/src/constants.rs b/crates/vm/levm/src/constants.rs index c2f8ae1b7..c38f3f429 100644 --- a/crates/vm/levm/src/constants.rs +++ b/crates/vm/levm/src/constants.rs @@ -115,6 +115,8 @@ pub const TX_BASE_COST: U256 = U256([21000, 0, 0, 0]); pub const MAX_CODE_SIZE: usize = 0x6000; pub const MAX_CREATE_CODE_SIZE: usize = 2 * MAX_CODE_SIZE; +pub const INVALID_CONTRACT_PREFIX: u8 = 0xef; + // Costs in gas for init word and init code (in wei) pub const INIT_WORD_COST: i64 = 2; diff --git a/crates/vm/levm/src/errors.rs b/crates/vm/levm/src/errors.rs index 12c107da1..c3f372276 100644 --- a/crates/vm/levm/src/errors.rs +++ b/crates/vm/levm/src/errors.rs @@ -22,10 +22,14 @@ pub enum VMError { MissingBlobHashes, BlobHashIndexOutOfBounds, RevertOpcode, - SenderAccountDoesNotExist, + AddressDoesNotMatchAnAccount, SenderAccountShouldNotHaveBytecode, SenderBalanceShouldContainTransferValue, GasPriceIsLowerThanBaseFee, + AddressAlreadyOccupied, + ContractOutputTooBig, + InvalidInitialByte, + NonceOverflow, } pub enum OpcodeSuccess { diff --git a/crates/vm/levm/src/utils.rs b/crates/vm/levm/src/utils.rs index 599b3efea..c80aa682a 100644 --- a/crates/vm/levm/src/utils.rs +++ b/crates/vm/levm/src/utils.rs @@ -46,7 +46,7 @@ pub fn new_vm_with_ops_addr_bal(bytecode: Bytes, address: Address, balance: U256 ), ]; - let state = Db { + let mut state = Db { accounts: accounts.into(), block_hashes: Default::default(), }; @@ -56,7 +56,7 @@ pub fn new_vm_with_ops_addr_bal(bytecode: Bytes, address: Address, balance: U256 // add the account passed by parameter VM::new( - Address::from_low_u64_be(42), + Some(Address::from_low_u64_be(42)), address, Default::default(), Default::default(), @@ -68,9 +68,12 @@ pub fn new_vm_with_ops_addr_bal(bytecode: Bytes, address: Address, balance: U256 U256::one(), Default::default(), Default::default(), - state, + &mut state, Default::default(), Default::default(), Default::default(), + Default::default(), + None, ) + .unwrap() } diff --git a/crates/vm/levm/src/vm.rs b/crates/vm/levm/src/vm.rs index ae305f6d9..952e1ba5a 100644 --- a/crates/vm/levm/src/vm.rs +++ b/crates/vm/levm/src/vm.rs @@ -201,9 +201,8 @@ pub fn word_to_address(word: U256) -> Address { } impl VM { - // TODO: Refactor this. #[allow(clippy::too_many_arguments)] - pub fn new( + fn call_type_transaction( to: Address, msg_sender: Address, value: U256, @@ -220,23 +219,13 @@ impl VM { block_blob_gas_used: Option, block_excess_blob_gas: Option, tx_blob_hashes: Option>, - ) -> Self { - // TODO: This handles only CALL transactions. + ) -> Result { let bytecode = db.get_account_bytecode(&to); - // TODO: This handles only CALL transactions. - // TODO: Remove this allow when CREATE is implemented. - #[allow(clippy::redundant_locals)] - let to = to; - - // TODO: In CALL this is the `to`, in CREATE it is not. - let code_addr = to; - - // TODO: this is mostly placeholder let initial_call_frame = CallFrame::new( msg_sender, to, - code_addr, + to, None, bytecode, value, @@ -264,11 +253,204 @@ impl VM { tx_blob_hashes, }; - Self { + Ok(VM { call_frames: vec![initial_call_frame], db, env, accrued_substate: Substate::default(), + }) + } + + // Functionality should be: + // (1) Check whether caller has enough balance to make a transfer + // (2) Derive the new contract’s address from the caller’s address (passing in the creator account’s nonce) + // (3) Create the new contract account using the derived contract address (changing the “world state” StateDB) + // (4) Transfer the initial Ether endowment from caller to the new contract + // (5) Set input data as contract’s deploy code, then execute it with EVM. The ret variable is the returned contract code + // (6) Check for error. Or if the contract code is too big, fail. Charge the user gas then set the contract code + // Source: https://medium.com/@hayeah/diving-into-the-ethereum-vm-part-5-the-smart-contract-creation-process-cb7b6133b855 + #[allow(clippy::too_many_arguments)] + fn create_type_transaction( + sender: Address, + secret_key: H256, + db: &mut Db, + value: U256, + calldata: Bytes, + block_number: U256, + coinbase: Address, + timestamp: U256, + prev_randao: Option, + chain_id: U256, + base_fee_per_gas: U256, + gas_price: U256, + block_blob_gas_used: Option, + block_excess_blob_gas: Option, + tx_blob_hashes: Option>, + salt: Option, + ) -> Result { + let mut db_copy = db.clone(); + let mut sender_account = match db_copy.accounts.get(&sender) { + Some(acc) => acc, + None => { + return Err(VMError::OutOfGas); + } + } + .clone(); + + // (1) + if sender_account.balance < value { + return Err(VMError::OutOfGas); // Maybe a more personalized error + } + + sender_account.nonce = sender_account + .nonce + .checked_add(1) + .ok_or(VMError::NonceOverflow)?; + + // (2) + let new_contract_address = match salt { + Some(salt) => VM::calculate_create2_address(sender, &calldata, salt), + None => VM::calculate_create_address(sender, sender_account.nonce), + }; + + // If address is already in db, there's an error + if db_copy.accounts.contains_key(&new_contract_address) { + return Err(VMError::AddressAlreadyOccupied); + } + + // (3) + let mut created_contract = Account::new( + new_contract_address, + value, + calldata.clone(), + 1, + Default::default(), + ); + db_copy.add_account(new_contract_address, created_contract.clone()); + + // (4) + sender_account.balance -= value; + created_contract.balance += value; + + // (5) + let code: Bytes = calldata.clone(); + + // Call the contract + let mut vm = VM::new( + Some(created_contract.address), + sender, + value, + code, + sender_account.balance, + block_number, + coinbase, + timestamp, + prev_randao, + chain_id, + base_fee_per_gas, + gas_price, + &mut db_copy, + block_blob_gas_used, + block_excess_blob_gas, + tx_blob_hashes, + secret_key, + None, + )?; + + let res = vm.transact()?; + // Don't use a revert bc work with clones, so don't have to save previous state + + let contract_code = res.output; + + // (6) + if contract_code.len() > MAX_CODE_SIZE { + return Err(VMError::ContractOutputTooBig); + } + // Supposing contract code has contents + if contract_code[0] == INVALID_CONTRACT_PREFIX { + return Err(VMError::InvalidInitialByte); + } + + // If the initialization code completes successfully, a final contract-creation cost is paid, + // the code-deposit cost, c, proportional to the size of the created contract’s code + let creation_cost = 200 * contract_code.len(); + + sender_account.balance = sender_account + .balance + .checked_sub(U256::from(creation_cost)) + .ok_or(VMError::OutOfGas)?; + + created_contract.bytecode = contract_code; + + let mut acc = db_copy.accounts.get_mut(&sender).unwrap(); + *acc = sender_account; + acc = db_copy.accounts.get_mut(&new_contract_address).unwrap(); + *acc = created_contract; + + *db = db_copy; + Ok(vm) + } + + // TODO: Refactor this. + #[allow(clippy::too_many_arguments)] + pub fn new( + to: Option
, + msg_sender: Address, + value: U256, + calldata: Bytes, + gas_limit: U256, + block_number: U256, + coinbase: Address, + timestamp: U256, + prev_randao: Option, + chain_id: U256, + base_fee_per_gas: U256, + gas_price: U256, + db: &mut Db, + block_blob_gas_used: Option, + block_excess_blob_gas: Option, + tx_blob_hashes: Option>, + secret_key: H256, + salt: Option, + ) -> Result { + // Maybe this desicion should be made in an upper layer + match to { + Some(address) => VM::call_type_transaction( + address, + msg_sender, + value, + calldata, + gas_limit, + block_number, + coinbase, + timestamp, + prev_randao, + chain_id, + base_fee_per_gas, + gas_price, + db.clone(), + block_blob_gas_used, + block_excess_blob_gas, + tx_blob_hashes, + ), + None => VM::create_type_transaction( + msg_sender, + secret_key, + db, + value, + calldata, + block_number, + coinbase, + timestamp, + prev_randao, + chain_id, + base_fee_per_gas, + gas_price, + block_blob_gas_used, + block_excess_blob_gas, + tx_blob_hashes, + salt, + ), } } @@ -415,7 +597,6 @@ impl VM { } } - // let account = self.db.accounts.get(&self.env.origin).unwrap(); /// Based on Ethereum yellow paper's initial tests of intrinsic validity (Section 6). The last version is /// Shanghai, so there are probably missing Cancun validations. The intrinsic validations are: /// @@ -437,7 +618,9 @@ impl VM { // Validations (1), (2), (3), (5), and (8) are assumed done in upper layers. let sender_account = match self.db.accounts.get(&self.env.origin) { Some(acc) => acc, - None => return Err(VMError::SenderAccountDoesNotExist), + None => return Err(VMError::AddressDoesNotMatchAnAccount), + // This is a check for completeness. However if it were a none and + // it was not caught it would be caught in clause 6. }; // (4) if sender_account.has_code() { diff --git a/crates/vm/levm/tests/tests.rs b/crates/vm/levm/tests/tests.rs index 4ddd2e8ee..982f2bcc8 100644 --- a/crates/vm/levm/tests/tests.rs +++ b/crates/vm/levm/tests/tests.rs @@ -3845,7 +3845,7 @@ fn caller_op() { ); let mut vm = VM::new( - address_that_has_the_code, + Some(address_that_has_the_code), caller, Default::default(), Default::default(), @@ -3857,11 +3857,14 @@ fn caller_op() { Default::default(), Default::default(), Default::default(), - db, + &mut db, Default::default(), Default::default(), Default::default(), - ); + Default::default(), + None, + ) + .unwrap(); let mut current_call_frame = vm.call_frames.pop().unwrap(); vm.execute(&mut current_call_frame); @@ -3887,7 +3890,7 @@ fn origin_op() { ); let mut vm = VM::new( - address_that_has_the_code, + Some(address_that_has_the_code), msg_sender, Default::default(), Default::default(), @@ -3899,11 +3902,14 @@ fn origin_op() { Default::default(), Default::default(), Default::default(), - db, + &mut db, Default::default(), Default::default(), Default::default(), - ); + Default::default(), + None, + ) + .unwrap(); let mut current_call_frame = vm.call_frames.pop().unwrap(); vm.execute(&mut current_call_frame); @@ -3955,7 +3961,7 @@ fn address_op() { ); let mut vm = VM::new( - address_that_has_the_code, + Some(address_that_has_the_code), Default::default(), Default::default(), Default::default(), @@ -3967,11 +3973,14 @@ fn address_op() { Default::default(), Default::default(), Default::default(), - db, + &mut db, Default::default(), Default::default(), Default::default(), - ); + Default::default(), + None, + ) + .unwrap(); let mut current_call_frame = vm.call_frames.pop().unwrap(); vm.execute(&mut current_call_frame); @@ -3999,7 +4008,7 @@ fn selfbalance_op() { ); let mut vm = VM::new( - address_that_has_the_code, + Some(address_that_has_the_code), Default::default(), Default::default(), Default::default(), @@ -4011,11 +4020,14 @@ fn selfbalance_op() { Default::default(), Default::default(), Default::default(), - db, + &mut db, Default::default(), Default::default(), Default::default(), - ); + Default::default(), + None, + ) + .unwrap(); let mut current_call_frame = vm.call_frames.pop().unwrap(); vm.execute(&mut current_call_frame); @@ -4039,7 +4051,7 @@ fn callvalue_op() { ); let mut vm = VM::new( - address_that_has_the_code, + Some(address_that_has_the_code), Default::default(), value, Default::default(), @@ -4051,11 +4063,14 @@ fn callvalue_op() { Default::default(), Default::default(), Default::default(), - db, + &mut db, Default::default(), Default::default(), Default::default(), - ); + Default::default(), + None, + ) + .unwrap(); let mut current_call_frame = vm.call_frames.pop().unwrap(); vm.execute(&mut current_call_frame); @@ -4078,7 +4093,7 @@ fn codesize_op() { ); let mut vm = VM::new( - address_that_has_the_code, + Some(address_that_has_the_code), Default::default(), Default::default(), Default::default(), @@ -4090,11 +4105,14 @@ fn codesize_op() { Default::default(), Default::default(), Default::default(), - db, + &mut db, Default::default(), Default::default(), Default::default(), - ); + Default::default(), + None, + ) + .unwrap(); let mut current_call_frame = vm.call_frames.pop().unwrap(); vm.execute(&mut current_call_frame); @@ -4119,7 +4137,7 @@ fn gasprice_op() { ); let mut vm = VM::new( - address_that_has_the_code, + Some(address_that_has_the_code), Default::default(), Default::default(), Default::default(), @@ -4131,11 +4149,14 @@ fn gasprice_op() { Default::default(), Default::default(), U256::from(0x9876), - db, + &mut db, Default::default(), Default::default(), Default::default(), - ); + Default::default(), + None, + ) + .unwrap(); let mut current_call_frame = vm.call_frames.pop().unwrap(); vm.execute(&mut current_call_frame); @@ -4177,7 +4198,7 @@ fn codecopy_op() { ); let mut vm = VM::new( - address_that_has_the_code, + Some(address_that_has_the_code), Default::default(), Default::default(), Default::default(), @@ -4189,11 +4210,14 @@ fn codecopy_op() { Default::default(), Default::default(), Default::default(), - db, + &mut db, Default::default(), Default::default(), Default::default(), - ); + Default::default(), + None, + ) + .unwrap(); let mut current_call_frame = vm.call_frames.pop().unwrap(); vm.execute(&mut current_call_frame); From 67764fc07d87d816a1db7de0a44a1fccb36f48ea Mon Sep 17 00:00:00 2001 From: Ivan Litteri <67517699+ilitteri@users.noreply.github.com> Date: Tue, 29 Oct 2024 15:40:20 -0300 Subject: [PATCH 2/3] feat(l2): `make restart` does not clean contract deps (#1010) --- crates/l2/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/l2/Makefile b/crates/l2/Makefile index 3fe872317..23914ec55 100644 --- a/crates/l2/Makefile +++ b/crates/l2/Makefile @@ -13,7 +13,7 @@ down: down-local-l1 down-l2 ## 🛑 Shuts down the localnet clean: clean-contract-deps ## 🧹 Cleans the localnet -restart: restart-local-l1 restart-contract-deps deploy-l1 restart-l2 ## 🔄 Restarts the localnet +restart: restart-local-l1 deploy-l1 restart-l2 ## 🔄 Restarts the localnet cli: ## 🛠️ Installs the L2 Lambda Ethereum Rust CLI cargo install --path ${ETHEREUM_RUST_PATH}/cmd/ethereum_rust_l2/ --force From ac7559aa8292feeccfe1838a292c20c16167cde8 Mon Sep 17 00:00:00 2001 From: ElFantasma Date: Tue, 29 Oct 2024 15:41:18 -0300 Subject: [PATCH 3/3] feat(l1): rlpx peer listen loop (#980) **Motivation** A p2p RLPx connection is maintained per connected peer. A listen loop is required to listen for requests and send messages **Description** - Added basic listen loop to start listening to expected messages - Added capabilities initialization to send initial messages per capability Closes #840 --- cmd/ethereum_rust/ethereum_rust.rs | 14 +- crates/blockchain/error.rs | 10 +- crates/networking/p2p/net.rs | 36 ++-- crates/networking/p2p/rlpx/connection.rs | 165 ++++++++++-------- crates/networking/p2p/rlpx/error.rs | 7 +- crates/networking/p2p/rlpx/eth.rs | 116 +----------- crates/networking/p2p/rlpx/eth/backend.rs | 29 +++ crates/networking/p2p/rlpx/eth/receipts.rs | 4 +- crates/networking/p2p/rlpx/eth/status.rs | 96 ++++++++++ .../networking/p2p/rlpx/eth/transactions.rs | 4 +- crates/networking/p2p/rlpx/handshake.rs | 22 +-- crates/networking/p2p/rlpx/message.rs | 15 +- crates/networking/p2p/rlpx/p2p.rs | 38 +++- 13 files changed, 318 insertions(+), 238 deletions(-) create mode 100644 crates/networking/p2p/rlpx/eth/backend.rs create mode 100644 crates/networking/p2p/rlpx/eth/status.rs diff --git a/cmd/ethereum_rust/ethereum_rust.rs b/cmd/ethereum_rust/ethereum_rust.rs index 4f10fd1bc..ff1e49803 100644 --- a/cmd/ethereum_rust/ethereum_rust.rs +++ b/cmd/ethereum_rust/ethereum_rust.rs @@ -1,6 +1,7 @@ use bytes::Bytes; use directories::ProjectDirs; use ethereum_rust_blockchain::add_block; +use ethereum_rust_blockchain::fork_choice::apply_fork_choice; use ethereum_rust_core::types::{Block, Genesis}; use ethereum_rust_core::H256; use ethereum_rust_net::bootnode::BootNode; @@ -123,19 +124,24 @@ async fn main() { if let Some(chain_rlp_path) = matches.get_one::("import") { let blocks = read_chain_file(chain_rlp_path); let size = blocks.len(); - for block in blocks { + for block in &blocks { let hash = block.header.compute_block_hash(); info!( "Adding block {} with hash {:#x}.", block.header.number, hash ); - if add_block(&block, &store).is_err() { + let result = add_block(block, &store); + if let Some(error) = result.err() { warn!( - "Failed to add block {} with hash {:#x}.", - block.header.number, hash + "Failed to add block {} with hash {:#x}: {}.", + block.header.number, hash, error ); } } + if let Some(last_block) = blocks.last() { + let hash = last_block.header.compute_block_hash(); + apply_fork_choice(&store, hash, hash, hash).unwrap(); + } info!("Added {} blocks to blockchain", size); } let jwt_secret = read_jwtsecret_file(authrpc_jwtsecret); diff --git a/crates/blockchain/error.rs b/crates/blockchain/error.rs index e2e7c57ec..9446795b4 100644 --- a/crates/blockchain/error.rs +++ b/crates/blockchain/error.rs @@ -1,10 +1,8 @@ -use thiserror::Error; - use ethereum_rust_core::types::InvalidBlockHeaderError; use ethereum_rust_storage::error::StoreError; use ethereum_rust_vm::EvmError; -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] pub enum ChainError { #[error("Invalid Block: {0}")] InvalidBlock(#[from] InvalidBlockError), @@ -20,7 +18,7 @@ pub enum ChainError { EvmError(#[from] EvmError), } -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] pub enum InvalidBlockError { #[error("World State Root does not match the one in the header after executing")] StateRootMismatch, @@ -36,7 +34,7 @@ pub enum InvalidBlockError { BlobGasUsedMismatch, } -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] pub enum MempoolError { #[error("No block header")] NoBlockHeaderError, @@ -67,7 +65,7 @@ pub enum ForkChoiceElement { Finalized, } -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] pub enum InvalidForkChoice { #[error("DB error: {0}")] StoreError(#[from] StoreError), diff --git a/crates/networking/p2p/net.rs b/crates/networking/p2p/net.rs index ff1a42300..fd512c138 100644 --- a/crates/networking/p2p/net.rs +++ b/crates/networking/p2p/net.rs @@ -50,6 +50,7 @@ pub async fn start_network( let discovery_handle = tokio::spawn(discover_peers( udp_addr, signer.clone(), + storage.clone(), table.clone(), bootnodes, )); @@ -66,6 +67,7 @@ pub async fn start_network( async fn discover_peers( udp_addr: SocketAddr, signer: SigningKey, + storage: Store, table: Arc>, bootnodes: Vec, ) { @@ -74,6 +76,7 @@ async fn discover_peers( let server_handler = tokio::spawn(discover_peers_server( udp_addr, udp_socket.clone(), + storage, table.clone(), signer.clone(), )); @@ -111,6 +114,7 @@ async fn discover_peers( async fn discover_peers_server( udp_addr: SocketAddr, udp_socket: Arc, + storage: Store, table: Arc>, signer: SigningKey, ) { @@ -196,9 +200,10 @@ async fn discover_peers_server( let mut msg_buf = vec![0; read - 32]; buf[32..read].clone_into(&mut msg_buf); - let signer_clone = signer.clone(); + let signer = signer.clone(); + let storage = storage.clone(); tokio::spawn(async move { - handle_peer_as_initiator(signer_clone, &msg_buf, &peer.node, table) + handle_peer_as_initiator(signer, &msg_buf, &peer.node, storage, table) .await; }); } else { @@ -724,13 +729,10 @@ async fn pong(socket: &UdpSocket, to_addr: SocketAddr, ping_hash: H256, signer: let _ = socket.send_to(&buf, to_addr).await; } -// TODO build a proper listen loop that receives requests from both -// peers and business layer and propagate storage to use when required -// https://github.com/lambdaclass/lambda_ethereum_rust/issues/840 async fn serve_requests( tcp_addr: SocketAddr, signer: SigningKey, - _storage: Store, + storage: Store, table: Arc>, ) { let tcp_socket = TcpSocket::new_v4().unwrap(); @@ -742,6 +744,7 @@ async fn serve_requests( tokio::spawn(handle_peer_as_receiver( signer.clone(), stream, + storage.clone(), table.clone(), )); } @@ -750,9 +753,10 @@ async fn serve_requests( async fn handle_peer_as_receiver( signer: SigningKey, stream: TcpStream, + storage: Store, table: Arc>, ) { - let conn = RLPxConnection::receiver(signer, stream); + let conn = RLPxConnection::receiver(signer, stream, storage); handle_peer(conn, table).await; } @@ -760,6 +764,7 @@ async fn handle_peer_as_initiator( signer: SigningKey, msg: &[u8], node: &Node, + storage: Store, table: Arc>, ) { info!("Trying RLPx connection with {node:?}"); @@ -768,19 +773,16 @@ async fn handle_peer_as_initiator( .connect(SocketAddr::new(node.ip, node.tcp_port)) .await .unwrap(); - let conn = RLPxConnection::initiator(signer, msg, stream).await; + let conn = RLPxConnection::initiator(signer, msg, stream, storage).await; handle_peer(conn, table).await; } async fn handle_peer(mut conn: RLPxConnection, table: Arc>) { match conn.handshake().await { - Ok(_) => { - // TODO Properly build listen loop - // https://github.com/lambdaclass/lambda_ethereum_rust/issues/840 - // loop { - // conn.await_messages(); - // } - } + Ok(_) => match conn.handle_peer().await { + Ok(_) => unreachable!(), + Err(e) => info!("Error during RLPx connection: ({e})"), + }, Err(e) => { // Discard peer from kademlia table info!("Handshake failed, discarding peer: ({e})"); @@ -798,6 +800,7 @@ pub fn node_id_from_signing_key(signer: &SigningKey) -> H512 { #[cfg(test)] mod tests { use super::*; + use ethereum_rust_storage::EngineType; use kademlia::bucket_number; use rand::rngs::OsRng; use std::{ @@ -844,12 +847,15 @@ mod tests { let signer = SigningKey::random(&mut OsRng); let udp_socket = Arc::new(UdpSocket::bind(addr).await.unwrap()); let node_id = node_id_from_signing_key(&signer); + let storage = + Store::new("temp.db", EngineType::InMemory).expect("Failed to create test DB"); let table = Arc::new(Mutex::new(KademliaTable::new(node_id))); if should_start_server { tokio::spawn(discover_peers_server( addr, udp_socket.clone(), + storage.clone(), table.clone(), signer.clone(), )); diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 54c948bc1..642535904 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -1,5 +1,5 @@ use crate::{ - rlpx::{handshake::encode_ack_message, message::Message, p2p, utils::id2pubkey}, + rlpx::{eth::backend, handshake::encode_ack_message, message::Message, p2p, utils::id2pubkey}, MAX_DISC_PACKET_SIZE, }; @@ -8,12 +8,14 @@ use super::{ frame, handshake::{decode_ack_message, decode_auth_message, encode_auth_message}, message as rlpx, + p2p::Capability, utils::{ecdh_xchng, pubkey2id}, }; use aes::cipher::KeyIvInit; use bytes::BufMut as _; use ethereum_rust_core::{H256, H512}; use ethereum_rust_rlp::decode::RLPDecode; +use ethereum_rust_storage::Store; use k256::{ ecdsa::{RecoveryId, Signature, SigningKey, VerifyingKey}, PublicKey, SecretKey, @@ -21,8 +23,11 @@ use k256::{ use sha3::{Digest, Keccak256}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tracing::{error, info}; -pub const SUPPORTED_CAPABILITIES: [(&str, u8); 2] = [("p2p", 5), ("eth", 68)]; -// pub const SUPPORTED_CAPABILITIES: [(&str, u8); 3] = [("p2p", 5), ("eth", 68), ("snap", 1)]; +const CAP_P2P: (Capability, u8) = (Capability::P2p, 5); +const CAP_ETH: (Capability, u8) = (Capability::Eth, 68); +//const CAP_SNAP: (Capability, u8) = (Capability::Snap, 1); +const SUPPORTED_CAPABILITIES: [(Capability, u8); 2] = [CAP_P2P, CAP_ETH]; +// pub const SUPPORTED_CAPABILITIES: [(&str, u8); 3] = [CAP_P2P, CAP_ETH, CAP_SNAP)]; pub(crate) type Aes256Ctr64BE = ctr::Ctr64BE; @@ -31,20 +36,22 @@ pub(crate) struct RLPxConnection { signer: SigningKey, state: RLPxConnectionState, stream: S, - capabilities: Vec<(String, u8)>, + storage: Store, + capabilities: Vec<(Capability, u8)>, } impl RLPxConnection { - fn new(signer: SigningKey, stream: S, state: RLPxConnectionState) -> Self { + fn new(signer: SigningKey, stream: S, state: RLPxConnectionState, storage: Store) -> Self { Self { signer, state, stream, + storage, capabilities: vec![], } } - pub fn receiver(signer: SigningKey, stream: S) -> Self { + pub fn receiver(signer: SigningKey, stream: S, storage: Store) -> Self { let mut rng = rand::thread_rng(); Self::new( signer, @@ -53,10 +60,11 @@ impl RLPxConnection { H256::random_using(&mut rng), SecretKey::random(&mut rng), )), + storage, ) } - pub async fn initiator(signer: SigningKey, msg: &[u8], stream: S) -> Self { + pub async fn initiator(signer: SigningKey, msg: &[u8], stream: S, storage: Store) -> Self { let mut rng = rand::thread_rng(); let digest = Keccak256::digest(&msg[65..]); let signature = &Signature::from_bytes(msg[..64].into()).unwrap(); @@ -67,7 +75,7 @@ impl RLPxConnection { SecretKey::random(&mut rng), pubkey2id(&peer_pk.into()), )); - RLPxConnection::new(signer, stream, state) + RLPxConnection::new(signer, stream, state, storage) } pub async fn handshake(&mut self) -> Result<(), RLPxError> { @@ -89,11 +97,84 @@ impl RLPxConnection { info!("Completed handshake!"); self.exchange_hello_messages().await?; - info!("Completed Hello roundtrip!"); Ok(()) } - pub async fn send_auth(&mut self) { + pub async fn exchange_hello_messages(&mut self) -> Result<(), RLPxError> { + let hello_msg = Message::Hello(p2p::HelloMessage::new( + SUPPORTED_CAPABILITIES.to_vec(), + PublicKey::from(self.signer.verifying_key()), + )); + + self.send(hello_msg).await; + + // Receive Hello message + match self.receive().await { + Message::Hello(hello_message) => { + self.capabilities = hello_message.capabilities; + + // Check if we have any capability in common + for cap in self.capabilities.clone() { + if SUPPORTED_CAPABILITIES.contains(&cap) { + return Ok(()); + } + } + // Return error if not + Err(RLPxError::HandshakeError( + "No matching capabilities".to_string(), + )) + } + _ => { + // Fail if it is not a hello message + Err(RLPxError::HandshakeError( + "Expected Hello message".to_string(), + )) + } + } + } + + pub async fn handle_peer(&mut self) -> Result<(), RLPxError> { + self.start_capabilities().await?; + match &self.state { + RLPxConnectionState::Established(_) => { + info!("Started peer main loop"); + loop { + match self.receive().await { + // TODO: implement handlers for each message type + Message::Disconnect(_) => info!("Received Disconnect"), + Message::Ping(_) => info!("Received Ping"), + Message::Pong(_) => info!("Received Pong"), + Message::Status(_) => info!("Received Status"), + // TODO: Add new message types and handlers as they are implemented + message => return Err(RLPxError::UnexpectedMessage(message)), + }; + } + } + _ => Err(RLPxError::InvalidState( + "Invalid connection state".to_string(), + )), + } + } + + pub fn get_remote_node_id(&self) -> H512 { + match &self.state { + RLPxConnectionState::Established(state) => state.remote_node_id, + // TODO proper error + _ => panic!("Invalid state"), + } + } + + async fn start_capabilities(&mut self) -> Result<(), RLPxError> { + // Sending eth Status if peer supports it + if self.capabilities.contains(&CAP_ETH) { + let status = backend::get_status(&self.storage).unwrap(); + self.send(Message::Status(status)).await; + } + // TODO: add new capabilities startup when required (eg. snap) + Ok(()) + } + + async fn send_auth(&mut self) { match &self.state { RLPxConnectionState::Initiator(initiator_state) => { let secret_key: SecretKey = self.signer.clone().into(); @@ -109,7 +190,6 @@ impl RLPxConnection { auth_message.put_slice(&msg); self.stream.write_all(&auth_message).await.unwrap(); - info!("Sent auth message correctly!"); self.state = RLPxConnectionState::InitiatedAuth(InitiatedAuth::new( initiator_state, @@ -121,24 +201,20 @@ impl RLPxConnection { }; } - pub async fn send_ack(&mut self) { + async fn send_ack(&mut self) { match &self.state { RLPxConnectionState::ReceivedAuth(received_auth_state) => { - let secret_key: SecretKey = self.signer.clone().into(); let peer_pk = id2pubkey(received_auth_state.remote_node_id).unwrap(); let mut ack_message = vec![]; let msg = encode_ack_message( - &secret_key, &received_auth_state.local_ephemeral_key, received_auth_state.local_nonce, &peer_pk, - &received_auth_state.remote_ephemeral_key, ); ack_message.put_slice(&msg); self.stream.write_all(&ack_message).await.unwrap(); - info!("Sent ack message correctly!"); self.state = RLPxConnectionState::Established(Box::new(Established::for_receiver( received_auth_state, @@ -150,7 +226,7 @@ impl RLPxConnection { }; } - pub async fn receive_auth(&mut self) { + async fn receive_auth(&mut self) { match &self.state { RLPxConnectionState::Receiver(receiver_state) => { let secret_key: SecretKey = self.signer.clone().into(); @@ -169,7 +245,6 @@ impl RLPxConnection { let auth_bytes = &buf[..msg_size + 2]; let msg = &buf[2..msg_size + 2]; let (auth, remote_ephemeral_key) = decode_auth_message(&secret_key, msg, auth_data); - info!("Received auth message correctly!"); // Build next state self.state = RLPxConnectionState::ReceivedAuth(ReceivedAuth::new( @@ -185,7 +260,7 @@ impl RLPxConnection { }; } - pub async fn receive_ack(&mut self) { + async fn receive_ack(&mut self) { match &self.state { RLPxConnectionState::InitiatedAuth(initiated_auth_state) => { let secret_key: SecretKey = self.signer.clone().into(); @@ -205,7 +280,6 @@ impl RLPxConnection { let msg = &buf[2..msg_size + 2]; let ack = decode_ack_message(&secret_key, msg, ack_data); let remote_ephemeral_key = ack.get_ephemeral_pubkey().unwrap(); - info!("Received ack message correctly!"); // Build next state self.state = RLPxConnectionState::Established(Box::new(Established::for_initiator( @@ -220,46 +294,7 @@ impl RLPxConnection { }; } - pub async fn exchange_hello_messages(&mut self) -> Result<(), RLPxError> { - let supported_capabilities: Vec<(String, u8)> = SUPPORTED_CAPABILITIES - .into_iter() - .map(|(name, version)| (name.to_string(), version)) - .collect(); - let hello_msg = Message::Hello(p2p::HelloMessage::new( - supported_capabilities.clone(), - PublicKey::from(self.signer.verifying_key()), - )); - - self.send(hello_msg).await; - info!("Hello message sent!"); - - // Receive Hello message - match self.receive().await { - Message::Hello(hello_message) => { - info!("Hello message received {hello_message:?}"); - self.capabilities = hello_message.capabilities; - - // Check if we have any capability in common - for cap in self.capabilities.clone() { - if supported_capabilities.contains(&cap) { - return Ok(()); - } - } - // Return error if not - Err(RLPxError::HandshakeError( - "No matching capabilities".to_string(), - )) - } - _ => { - // Fail if it is not a hello message - Err(RLPxError::HandshakeError( - "Expected Hello message".to_string(), - )) - } - } - } - - pub async fn send(&mut self, message: rlpx::Message) { + async fn send(&mut self, message: rlpx::Message) { match &mut self.state { RLPxConnectionState::Established(state) => { let mut frame_buffer = vec![]; @@ -277,7 +312,7 @@ impl RLPxConnection { } } - pub async fn receive(&mut self) -> rlpx::Message { + async fn receive(&mut self) -> rlpx::Message { match &mut self.state { RLPxConnectionState::Established(state) => { let frame_data = frame::read(state, &mut self.stream).await; @@ -289,14 +324,6 @@ impl RLPxConnection { _ => panic!("Received an unexpected message"), } } - - pub fn get_remote_node_id(&self) -> H512 { - match &self.state { - RLPxConnectionState::Established(state) => state.remote_node_id, - // TODO proper error - _ => panic!("Invalid state"), - } - } } enum RLPxConnectionState { diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index e74d4339a..4177ea10f 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -1,8 +1,13 @@ +use crate::rlpx::message::Message; use thiserror::Error; // TODO improve errors #[derive(Debug, Error)] -pub enum RLPxError { +pub(crate) enum RLPxError { #[error("{0}")] HandshakeError(String), + #[error("{0}")] + InvalidState(String), + #[error("Unexpected message: {0}")] + UnexpectedMessage(Message), } diff --git a/crates/networking/p2p/rlpx/eth.rs b/crates/networking/p2p/rlpx/eth.rs index 52387e6aa..a03ff256c 100644 --- a/crates/networking/p2p/rlpx/eth.rs +++ b/crates/networking/p2p/rlpx/eth.rs @@ -1,111 +1,5 @@ -use super::{message::RLPxMessage, utils::snappy_encode}; -use bytes::BufMut; -use ethereum_rust_core::{ - types::{BlockHash, ForkId}, - U256, -}; -use ethereum_rust_rlp::{ - encode::RLPEncode, - error::{RLPDecodeError, RLPEncodeError}, - structs::{Decoder, Encoder}, -}; -use ethereum_rust_storage::{error::StoreError, Store}; -use snap::raw::Decoder as SnappyDecoder; - -pub const ETH_VERSION: u32 = 68; -pub const HASH_FIRST_BYTE_DECODER: u8 = 160; - -mod blocks; -mod receipts; -mod transactions; - -#[derive(Debug)] -pub(crate) struct StatusMessage { - eth_version: u32, - network_id: u64, - total_difficulty: U256, - block_hash: BlockHash, - genesis: BlockHash, - fork_id: ForkId, -} - -// TODO remove this allow once we construct StatusMessages -#[allow(unused)] -impl StatusMessage { - pub fn new(storage: &Store) -> Result { - let chain_config = storage.get_chain_config()?; - let total_difficulty = - U256::from(chain_config.terminal_total_difficulty.unwrap_or_default()); - let network_id = chain_config.chain_id; - - // These blocks must always be available - let genesis_header = storage.get_block_header(0)?.unwrap(); - let block_number = storage.get_latest_block_number()?.unwrap(); - let block_header = storage.get_block_header(block_number)?.unwrap(); - - let genesis = genesis_header.compute_block_hash(); - let block_hash = block_header.compute_block_hash(); - let fork_id = ForkId::new(chain_config, genesis, block_header.timestamp, block_number); - Ok(Self { - eth_version: ETH_VERSION, - network_id, - total_difficulty, - block_hash, - genesis, - fork_id, - }) - } -} - -impl RLPxMessage for StatusMessage { - fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - 16_u8.encode(buf); // msg_id - - let mut encoded_data = vec![]; - Encoder::new(&mut encoded_data) - .encode_field(&self.eth_version) - .encode_field(&self.network_id) - .encode_field(&self.total_difficulty) - .encode_field(&self.block_hash) - .encode_field(&self.genesis) - .encode_field(&self.fork_id) - .finish(); - - let msg_data = snappy_encode(encoded_data)?; - buf.put_slice(&msg_data); - Ok(()) - } - - fn decode(msg_data: &[u8]) -> Result { - let mut snappy_decoder = SnappyDecoder::new(); - let decompressed_data = snappy_decoder - .decompress_vec(msg_data) - .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; - let decoder = Decoder::new(&decompressed_data)?; - let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; - - assert_eq!(eth_version, 68, "only eth version 68 is supported"); - - let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; - - let (total_difficulty, decoder): (U256, _) = decoder.decode_field("totalDifficulty")?; - - let (block_hash, decoder): (BlockHash, _) = decoder.decode_field("blockHash")?; - - let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; - - let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; - - // Implementations must ignore any additional list elements - let _padding = decoder.finish_unchecked(); - - Ok(Self { - eth_version, - network_id, - total_difficulty, - block_hash, - genesis, - fork_id, - }) - } -} +pub(crate) mod backend; +pub(crate) mod blocks; +pub(crate) mod receipts; +pub(crate) mod status; +pub(crate) mod transactions; diff --git a/crates/networking/p2p/rlpx/eth/backend.rs b/crates/networking/p2p/rlpx/eth/backend.rs new file mode 100644 index 000000000..1af62214a --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/backend.rs @@ -0,0 +1,29 @@ +use ethereum_rust_core::{types::ForkId, U256}; +use ethereum_rust_storage::{error::StoreError, Store}; + +use super::status::StatusMessage; + +pub const ETH_VERSION: u32 = 68; + +pub fn get_status(storage: &Store) -> Result { + let chain_config = storage.get_chain_config()?; + let total_difficulty = U256::from(chain_config.terminal_total_difficulty.unwrap_or_default()); + let network_id = chain_config.chain_id; + + // These blocks must always be available + let genesis_header = storage.get_block_header(0)?.unwrap(); + let block_number = storage.get_latest_block_number()?.unwrap(); + let block_header = storage.get_block_header(block_number)?.unwrap(); + + let genesis = genesis_header.compute_block_hash(); + let block_hash = block_header.compute_block_hash(); + let fork_id = ForkId::new(chain_config, genesis, block_header.timestamp, block_number); + Ok(StatusMessage::new( + ETH_VERSION, + network_id, + total_difficulty, + block_hash, + genesis, + fork_id, + )) +} diff --git a/crates/networking/p2p/rlpx/eth/receipts.rs b/crates/networking/p2p/rlpx/eth/receipts.rs index 496273341..5d76a2f27 100644 --- a/crates/networking/p2p/rlpx/eth/receipts.rs +++ b/crates/networking/p2p/rlpx/eth/receipts.rs @@ -6,9 +6,7 @@ use ethereum_rust_rlp::{ }; use snap::raw::Decoder as SnappyDecoder; -use crate::rlpx::message::RLPxMessage; - -use super::snappy_encode; +use crate::rlpx::{message::RLPxMessage, utils::snappy_encode}; // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getreceipts-0x0f #[derive(Debug)] diff --git a/crates/networking/p2p/rlpx/eth/status.rs b/crates/networking/p2p/rlpx/eth/status.rs new file mode 100644 index 000000000..9050ac433 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/status.rs @@ -0,0 +1,96 @@ +use bytes::BufMut; +use ethereum_rust_core::{ + types::{BlockHash, ForkId}, + U256, +}; +use ethereum_rust_rlp::{ + encode::RLPEncode, + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; +use snap::raw::Decoder as SnappyDecoder; + +use crate::rlpx::{message::RLPxMessage, utils::snappy_encode}; + +#[derive(Debug)] +pub(crate) struct StatusMessage { + eth_version: u32, + network_id: u64, + total_difficulty: U256, + block_hash: BlockHash, + genesis: BlockHash, + fork_id: ForkId, +} + +impl StatusMessage { + pub fn new( + eth_version: u32, + network_id: u64, + total_difficulty: U256, + block_hash: BlockHash, + genesis: BlockHash, + fork_id: ForkId, + ) -> Self { + Self { + eth_version, + network_id, + total_difficulty, + block_hash, + genesis, + fork_id, + } + } +} + +impl RLPxMessage for StatusMessage { + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + 16_u8.encode(buf); // msg_id + + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.eth_version) + .encode_field(&self.network_id) + .encode_field(&self.total_difficulty) + .encode_field(&self.block_hash) + .encode_field(&self.genesis) + .encode_field(&self.fork_id) + .finish(); + + let msg_data = snappy_encode(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let mut snappy_decoder = SnappyDecoder::new(); + let decompressed_data = snappy_decoder + .decompress_vec(msg_data) + .map_err(|e| RLPDecodeError::Custom(e.to_string()))?; + let decoder = Decoder::new(&decompressed_data)?; + let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; + + assert_eq!(eth_version, 68, "only eth version 68 is supported"); + + let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; + + let (total_difficulty, decoder): (U256, _) = decoder.decode_field("totalDifficulty")?; + + let (block_hash, decoder): (BlockHash, _) = decoder.decode_field("blockHash")?; + + let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; + + let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; + + // Implementations must ignore any additional list elements + let _padding = decoder.finish_unchecked(); + + Ok(Self::new( + eth_version, + network_id, + total_difficulty, + block_hash, + genesis, + fork_id, + )) + } +} diff --git a/crates/networking/p2p/rlpx/eth/transactions.rs b/crates/networking/p2p/rlpx/eth/transactions.rs index 4923e3b19..d84e9c228 100644 --- a/crates/networking/p2p/rlpx/eth/transactions.rs +++ b/crates/networking/p2p/rlpx/eth/transactions.rs @@ -6,9 +6,7 @@ use ethereum_rust_rlp::{ }; use snap::raw::Decoder as SnappyDecoder; -use crate::rlpx::message::RLPxMessage; - -use super::snappy_encode; +use crate::rlpx::{message::RLPxMessage, utils::snappy_encode}; // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#transactions-0x02 // Broadcast message diff --git a/crates/networking/p2p/rlpx/handshake.rs b/crates/networking/p2p/rlpx/handshake.rs index 679a0f8d8..c88941ca5 100644 --- a/crates/networking/p2p/rlpx/handshake.rs +++ b/crates/networking/p2p/rlpx/handshake.rs @@ -13,7 +13,6 @@ use k256::{ PublicKey, SecretKey, }; use rand::Rng; -use tracing::info; type Aes128Ctr64BE = ctr::Ctr64BE; @@ -56,39 +55,20 @@ pub(crate) fn decode_auth_message( // RLP-decode the message. let (auth, _padding) = AuthMessage::decode_unfinished(&payload).unwrap(); - info!( - "signature: {:?} node_id: {:?} nonce: {:?}", - &auth.signature, &auth.node_id, &auth.nonce - ); - - let peer_pk = id2pubkey(auth.node_id).unwrap(); - // Derive a shared secret from the static keys. + let peer_pk = id2pubkey(auth.node_id).unwrap(); let static_shared_secret = ecdh_xchng(static_key, &peer_pk); - info!("token {static_shared_secret:?}"); - let remote_ephemeral_key = retrieve_remote_ephemeral_key(static_shared_secret.into(), auth.nonce, auth.signature); - - info!("remote pub key {remote_ephemeral_key:?}"); - (auth, remote_ephemeral_key) } /// Encodes an Ack message, to complete a handshake pub fn encode_ack_message( - static_key: &SecretKey, local_ephemeral_key: &SecretKey, local_nonce: H256, remote_static_pubkey: &PublicKey, - remote_ephemeral_key: &PublicKey, ) -> Vec { - // Derive a shared secret from the static keys. - let static_shared_secret = ecdh_xchng(static_key, remote_static_pubkey); - info!("token {static_shared_secret:?}"); - - info!("remote pub key {remote_ephemeral_key:?}"); - // Compose the ack message. let ack_msg = AckMessage::new(pubkey2id(&local_ephemeral_key.public_key()), local_nonce); diff --git a/crates/networking/p2p/rlpx/message.rs b/crates/networking/p2p/rlpx/message.rs index e94b2ccb8..8f06159be 100644 --- a/crates/networking/p2p/rlpx/message.rs +++ b/crates/networking/p2p/rlpx/message.rs @@ -1,7 +1,8 @@ use bytes::BufMut; use ethereum_rust_rlp::error::{RLPDecodeError, RLPEncodeError}; +use std::fmt::Display; -use super::eth::StatusMessage; +use super::eth::status::StatusMessage; use super::p2p::{DisconnectMessage, HelloMessage, PingMessage, PongMessage}; pub trait RLPxMessage: Sized { @@ -40,3 +41,15 @@ impl Message { } } } + +impl Display for Message { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Message::Hello(_) => "p2p:Hello".fmt(f), + Message::Disconnect(_) => "p2p:Disconnect".fmt(f), + Message::Ping(_) => "p2p:Ping".fmt(f), + Message::Pong(_) => "p2p:Pong".fmt(f), + Message::Status(_) => "eth:Status".fmt(f), + } + } +} diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index 852e66545..4521c5bef 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -1,7 +1,8 @@ use bytes::BufMut; use ethereum_rust_core::H512; use ethereum_rust_rlp::{ - encode::RLPEncode as _, + decode::RLPDecode, + encode::RLPEncode, error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; @@ -15,14 +16,43 @@ use super::{ utils::{pubkey2id, snappy_encode}, }; +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum Capability { + P2p, + Eth, + Snap, +} + +impl RLPEncode for Capability { + fn encode(&self, buf: &mut dyn BufMut) { + match self { + Self::P2p => "p2p".encode(buf), + Self::Eth => "eth".encode(buf), + Self::Snap => "snap".encode(buf), + } + } +} + +impl RLPDecode for Capability { + fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { + let (cap_string, rest) = String::decode_unfinished(rlp)?; + match cap_string.as_str() { + "p2p" => Ok((Capability::P2p, rest)), + "eth" => Ok((Capability::Eth, rest)), + "snap" => Ok((Capability::Snap, rest)), + _ => Err(RLPDecodeError::UnexpectedString), + } + } +} + #[derive(Debug)] pub(crate) struct HelloMessage { - pub(crate) capabilities: Vec<(String, u8)>, + pub(crate) capabilities: Vec<(Capability, u8)>, pub(crate) node_id: PublicKey, } impl HelloMessage { - pub fn new(capabilities: Vec<(String, u8)>, node_id: PublicKey) -> Self { + pub fn new(capabilities: Vec<(Capability, u8)>, node_id: PublicKey) -> Self { Self { capabilities, node_id, @@ -55,7 +85,7 @@ impl RLPxMessage for HelloMessage { // TODO: store client id for debugging purposes // [[cap1, capVersion1], [cap2, capVersion2], ...] - let (capabilities, decoder): (Vec<(String, u8)>, _) = + let (capabilities, decoder): (Vec<(Capability, u8)>, _) = decoder.decode_field("capabilities").unwrap(); // This field should be ignored