Skip to content

Commit

Permalink
RPC client for fetching tx lists (#23)
Browse files Browse the repository at this point in the history
* RPC client for fetching tx lists

RPC server for tests

* Improve async processing and error handling

* More assert to  the gen l2 tx test

---------

Co-authored-by: Denis Kolodin <[email protected]>
Co-authored-by: Ahmad Bitar <[email protected]>
  • Loading branch information
3 people authored Jun 24, 2024
1 parent bfd9ba1 commit 463204c
Show file tree
Hide file tree
Showing 8 changed files with 1,372 additions and 142 deletions.
1,234 changes: 1,139 additions & 95 deletions Node/Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion Node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@ edition = "2021"
[dependencies]
tokio = { version = "1.38", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = "0.3"
tracing-subscriber = "0.3"
jsonrpsee = { version = "0.23", features = ["http-client", "server"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
lazy_static = "1.4"
anyhow = "1.0.86"
7 changes: 5 additions & 2 deletions Node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ mod mev_boost;
mod node;
mod p2p_network;
mod taiko;
mod utils;

use anyhow::Error;
use tokio::sync::mpsc;

const MESSAGE_QUEUE_SIZE: usize = 100;

#[tokio::main]
async fn main() {
async fn main() -> Result<(), Error> {
init_logging();

let (avs_p2p_tx, avs_p2p_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
Expand All @@ -18,7 +20,8 @@ async fn main() {
p2p.start();

let node = node::Node::new(node_rx, avs_p2p_tx);
node.start();
node.entrypoint().await?;
Ok(())
}

fn init_logging() {
Expand Down
75 changes: 35 additions & 40 deletions Node/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,67 @@
use crate::taiko::Taiko;
use anyhow::{anyhow as err, Context, Error};
use tokio::sync::mpsc::{Receiver, Sender};

pub struct Node {
taiko: Taiko,
node_rx: Option<Receiver<String>>,
node_rx: Receiver<String>,
avs_p2p_tx: Sender<String>,
}

impl Node {
pub fn new(node_rx: Receiver<String>, avs_p2p_tx: Sender<String>) -> Self {
let taiko = Taiko::new();
let taiko = Taiko::new("http://127.0.0.1:1234");
Self {
taiko,
node_rx: Some(node_rx),
node_rx,
avs_p2p_tx,
}
}

/// Consumes the Node and starts two loops:
/// one for handling incoming messages and one for the block preconfirmation
pub fn start(mut self) {
pub async fn entrypoint(mut self) -> Result<(), Error> {
tracing::info!("Starting node");
self.start_new_msg_receiver_thread();
self.main_block_preconfirmation_loop()
}

fn main_block_preconfirmation_loop(&self) {
loop {
self.taiko.get_pending_l2_tx_lists();
self.commit_to_the_tx_lists();
self.send_preconfirmations_to_the_avs_p2p();
self.taiko.submit_new_l2_blocks();
if let Err(err) = self.step().await {
tracing::error!("Node processing step failed: {}", err);
}
}
}

//TODO: remove after implementation of above methods
std::thread::sleep(std::time::Duration::from_secs(1));
async fn step(&mut self) -> Result<(), Error> {
if let Ok(msg) = self.node_rx.try_recv() {
self.process_incoming_message(msg).await?;
} else {
self.main_block_preconfirmation_step().await?;
}
Ok(())
}

fn commit_to_the_tx_lists(&self) {
//TODO: implement
async fn main_block_preconfirmation_step(&self) -> Result<(), Error> {
self.taiko
.get_pending_l2_tx_lists()
.await
.context("Failed to get pending l2 tx lists")?;
self.commit_to_the_tx_lists();
self.send_preconfirmations_to_the_avs_p2p().await?;
self.taiko.submit_new_l2_blocks();
Ok(())
}

fn send_preconfirmations_to_the_avs_p2p(&self) {
let avs_p2p_tx = self.avs_p2p_tx.clone();
tokio::spawn(async move {
if let Err(e) = avs_p2p_tx.send("Hello from node!".to_string()).await {
tracing::error!("Failed to send message to avs_p2p_tx: {}", e);
}
});
async fn process_incoming_message(&mut self, msg: String) -> Result<(), Error> {
tracing::debug!("Node received message: {}", msg);
Ok(())
}

fn start_new_msg_receiver_thread(&mut self) {
if let Some(node_rx) = self.node_rx.take() {
tokio::spawn(async move {
Self::handle_incoming_messages(node_rx).await;
});
} else {
tracing::error!("node_rx has already been moved");
}
fn commit_to_the_tx_lists(&self) {
//TODO: implement
}

async fn handle_incoming_messages(mut node_rx: Receiver<String>) {
loop {
tokio::select! {
Some(message) = node_rx.recv() => {
tracing::debug!("Node received message: {}", message);
}
}
}
async fn send_preconfirmations_to_the_avs_p2p(&self) -> Result<(), Error> {
self.avs_p2p_tx
.send("Hello from node!".to_string())
.await
.map_err(|e| err!("Failed to send message to avs_p2p_tx: {}", e))
}
}
49 changes: 45 additions & 4 deletions Node/src/taiko/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,56 @@
pub struct Taiko {}
use crate::utils::rpc_client::RpcClient;
use anyhow::Error;
use serde_json::Value;

pub struct Taiko {
rpc_client: RpcClient,
}

impl Taiko {
pub fn new() -> Self {
Self {}
pub fn new(url: &str) -> Self {
Self {
rpc_client: RpcClient::new(url),
}
}

pub fn get_pending_l2_tx_lists(&self) {
pub async fn get_pending_l2_tx_lists(&self) -> Result<Value, Error> {
tracing::debug!("Getting L2 tx lists");
self.rpc_client
.call_method("RPC.GetL2TxLists", vec![])
.await
}

pub fn submit_new_l2_blocks(&self) {
tracing::debug!("Submitting new L2 blocks");
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::utils::rpc_server::test::RpcServer;
use std::net::SocketAddr;

#[tokio::test]
async fn test_get_pending_l2_tx_lists() {
tracing_subscriber::fmt::init();

// Start the RPC server
let mut rpc_server = RpcServer::new();
let addr: SocketAddr = "127.0.0.1:3030".parse().unwrap();
rpc_server.start_test_responses(addr).await.unwrap();

let taiko = Taiko::new("http://127.0.0.1:3030");
let json = taiko.get_pending_l2_tx_lists().await.unwrap();

assert_eq!(json["result"]["TxLists"].as_array().unwrap().len(), 1);
assert_eq!(json["result"]["TxLists"][0].as_array().unwrap().len(), 3);
assert_eq!(json["result"]["TxLists"][0][0]["type"], "0x0");
assert_eq!(json["result"]["TxLists"][0][0]["hash"], "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd");
assert_eq!(json["result"]["TxLists"][0][1]["type"], "0x2");
assert_eq!(json["result"]["TxLists"][0][1]["hash"], "0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b");
assert_eq!(json["result"]["TxLists"][0][2]["type"], "0x2");
assert_eq!(json["result"]["TxLists"][0][2]["hash"], "0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193");
rpc_server.stop().await;
}
}
2 changes: 2 additions & 0 deletions Node/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod rpc_client;
pub mod rpc_server;
28 changes: 28 additions & 0 deletions Node/src/utils/rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use anyhow::Error;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use serde_json::Value;
use std::time::Duration;

pub struct RpcClient {
client: HttpClient,
}

impl RpcClient {
pub fn new(url: &str) -> Self {
// let client = HttpClientBuilder::default().build(url).unwrap();

let client = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(1))
.build(url)
.unwrap();
RpcClient { client }
}

pub async fn call_method(&self, method: &str, params: Vec<Value>) -> Result<Value, Error> {
self.client
.request(method, params)
.await
.map_err(Error::from)
}
}
112 changes: 112 additions & 0 deletions Node/src/utils/rpc_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#[cfg(test)]
pub mod test {
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use jsonrpsee::RpcModule;
use lazy_static::lazy_static;
use serde_json::json;
use std::net::SocketAddr;
use tracing::info;

pub struct RpcServer {
handle: Option<ServerHandle>,
}

impl RpcServer {
pub fn new() -> Self {
RpcServer {
handle: None::<ServerHandle>,
}
}

#[cfg(test)]
pub async fn start_test_responses(
&mut self,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
let server = ServerBuilder::default().build(addr).await?;
let mut module = RpcModule::new(());

module.register_async_method("RPC.GetL2TxLists", |_, _, _| async {
TX_LISTS_RESPONSE.clone()
})?;

let handle = server.start(module);
tokio::spawn(handle.clone().stopped());

self.handle = Some(handle);
Ok(())
}

pub async fn stop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.stop().unwrap();
}
info!("Server stopped");
}
}

lazy_static! {
pub static ref TX_LISTS_RESPONSE: serde_json::Value = json!({
"result": {
"TxLists": [
[
{
"type": "0x0",
"chainId": "0x28c61",
"nonce": "0x1",
"to": "0xbfadd5365bb2890ad832038837115e60b71f7cbb",
"gas": "0x267ac",
"gasPrice": "0x5e76e0800",
"maxPriorityFeePerGas": null,
"maxFeePerGas": null,
"value": "0x0",
"input": "0x40d097c30000000000000000000000004cea2c7d358e313f5d0287c475f9ae943fe1a913",
"v": "0x518e6",
"r": "0xb22da5cdc4c091ec85d2dda9054aa497088e55bd9f0335f39864ae1c598dd35",
"s": "0x6eee1bcfe6a1855e89dd23d40942c90a036f273159b4c4fd217d58169493f055",
"hash": "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd"
},
{
"type": "0x2",
"chainId": "0x28c61",
"nonce": "0x3f",
"to": "0x380a5ba81efe70fe98ab56613ebf9244a2f3d4c9",
"gas": "0x2c2c8",
"gasPrice": null,
"maxPriorityFeePerGas": "0x1",
"maxFeePerGas": "0x3",
"value": "0x5af3107a4000",
"input": "0x3593564c000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000006672d0a400000000000000000000000000000000000000000000000000000000000000020b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000005af3107a40000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000005af3107a400000000000000000000000000000000000000000000000000000000353ca3e629a00000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002bae2c46ddb314b9ba743c6dee4878f151881333d9000bb8ebf1f662bf092ff0d913a9fe9d7179b0efef1611000000000000000000000000000000000000000000",
"accessList": [],
"v": "0x1",
"r": "0x36517a175a60d3026380318917976fa32c82e542850357a611af05d2212ab9a4",
"s": "0x32d89dce30d76287ddba907b0c662cd09dc30891b1c9c2ef644edfc53160b298",
"yParity": "0x1",
"hash": "0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b"
},
{
"type": "0x2",
"chainId": "0x28c61",
"nonce": "0x39",
"to": "0x380a5ba81efe70fe98ab56613ebf9244a2f3d4c9",
"gas": "0x2c2c8",
"gasPrice": null,
"maxPriorityFeePerGas": "0x1",
"maxFeePerGas": "0x3",
"value": "0x5af3107a4000",
"input": "0x3593564c000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000006672d0d400000000000000000000000000000000000000000000000000000000000000020b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000005af3107a40000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000005af3107a400000000000000000000000000000000000000000000000000000000353ca3e629a00000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002bae2c46ddb314b9ba743c6dee4878f151881333d9000bb8ebf1f662bf092ff0d913a9fe9d7179b0efef1611000000000000000000000000000000000000000000",
"accessList": [],
"v": "0x0",
"r": "0xc779421d1ee81dbd3dfbfad5fd632b45303b4513ea1b8ac0bc647f5430cd97b9",
"s": "0x13cedef844bf5a954183182992ffbf9b8b23331de255157528be7da6614618b2",
"yParity": "0x0",
"hash": "0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193"
}
]
]
},
"error": null,
"id": 1
});
}
}

0 comments on commit 463204c

Please sign in to comment.