Skip to content

Commit

Permalink
Add an rpc client
Browse files Browse the repository at this point in the history
  • Loading branch information
ebin-mathews committed Jan 2, 2024
1 parent ae88e1d commit f9b97ae
Show file tree
Hide file tree
Showing 9 changed files with 5,151 additions and 0 deletions.
4,093 changes: 4,093 additions & 0 deletions json_rpc_client/Cargo.lock

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions json_rpc_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "jito-block-engine-json-rpc-client"
version = "0.1.0"
edition = "2021"
description = "A sample rpc client to generate and send requests to jito block engine server"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.68"
bincode = "1.3.3"
log = "0.4.17"
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0.189", features = ["derive"] }
serde_json = "1.0.107"
solana-rpc-client = "=1.16"
solana-rpc-client-api = "=1.16"
solana-sdk = "=1.16"
solana-transaction-status = "=1.16"
thiserror = "1.0.40"
tokio = { version = "~1.14.1", features = ["rt-multi-thread"] }

[workspace]
125 changes: 125 additions & 0 deletions json_rpc_client/src/jsonrpc_client/client_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
pub use reqwest;
use solana_rpc_client_api::{client_error::ErrorKind, request};
use solana_sdk::{
signature::SignerError, transaction::TransactionError, transport::TransportError,
};
use thiserror::Error as ThisError;

use crate::jsonrpc_client::request::RpcRequest;

#[derive(ThisError, Debug)]
#[error("{kind}")]
pub struct Error {
pub request: Option<RpcRequest>,

#[source]
pub kind: ErrorKind,
}

impl Error {
pub fn new_with_request(kind: ErrorKind, request: RpcRequest) -> Self {
Self {
request: Some(request),
kind,
}
}

pub fn into_with_request(self, request: RpcRequest) -> Self {
Self {
request: Some(request),
..self
}
}

pub fn request(&self) -> Option<&RpcRequest> {
self.request.as_ref()
}

pub fn kind(&self) -> &ErrorKind {
&self.kind
}

pub fn get_transaction_error(&self) -> Option<TransactionError> {
self.kind.get_transaction_error()
}
}

impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Self {
Self {
request: None,
kind,
}
}
}

impl From<TransportError> for Error {
fn from(err: TransportError) -> Self {
Self {
request: None,
kind: err.into(),
}
}
}

impl From<Error> for TransportError {
fn from(client_error: Error) -> Self {
client_error.kind.into()
}
}

impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self {
request: None,
kind: err.into(),
}
}
}

impl From<reqwest::Error> for Error {
fn from(err: reqwest::Error) -> Self {
Self {
request: None,
kind: err.into(),
}
}
}

impl From<request::RpcError> for Error {
fn from(err: request::RpcError) -> Self {
Self {
request: None,
kind: err.into(),
}
}
}

impl From<serde_json::error::Error> for Error {
fn from(err: serde_json::error::Error) -> Self {
Self {
request: None,
kind: err.into(),
}
}
}

impl From<SignerError> for Error {
fn from(err: SignerError) -> Self {
Self {
request: None,
kind: err.into(),
}
}
}

impl From<TransactionError> for Error {
fn from(err: TransactionError) -> Self {
Self {
request: None,
kind: err.into(),
}
}
}

pub type Result<T> = std::result::Result<T, Error>;
218 changes: 218 additions & 0 deletions json_rpc_client/src/jsonrpc_client/http_sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
time::{Duration, Instant},
};

use async_trait::async_trait;
use log::debug;
use reqwest::{
self,
header::{CONTENT_TYPE, RETRY_AFTER},
StatusCode,
};
use solana_rpc_client_api::{
custom_error,
error_object::RpcErrorObject,
request::{RpcError, RpcResponseErrorData},
response::RpcSimulateTransactionResult,
};
use tokio::time::sleep;

use crate::jsonrpc_client::{client_error::Result, request::RpcRequest, rpc_sender::RpcSender};

pub struct HttpSender {
client: Arc<reqwest::Client>,
url: String,
request_id: AtomicU64,
stats: RwLock<solana_rpc_client::rpc_sender::RpcTransportStats>,
}

/// Nonblocking [`RpcSender`] over HTTP.
impl HttpSender {
/// Create an HTTP RPC sender.
///
/// The URL is an HTTP URL, usually for port 8899, as in
/// "http://localhost:8899". The sender has a default timeout of 30 seconds.
pub fn new<U: ToString>(url: U) -> Self {
Self::new_with_timeout(url, Duration::from_secs(30))
}

/// Create an HTTP RPC sender.
///
/// The URL is an HTTP URL, usually for port 8899.
pub fn new_with_timeout<U: ToString>(url: U, timeout: Duration) -> Self {
let client = Arc::new(
reqwest::Client::builder()
.timeout(timeout)
.pool_idle_timeout(timeout)
.build()
.expect("build rpc client"),
);

Self {
client,
url: url.to_string(),
request_id: AtomicU64::new(0),
stats: RwLock::new(solana_rpc_client::rpc_sender::RpcTransportStats::default()),
}
}
}

struct StatsUpdater<'a> {
stats: &'a RwLock<solana_rpc_client::rpc_sender::RpcTransportStats>,
request_start_time: Instant,
rate_limited_time: Duration,
}

impl<'a> StatsUpdater<'a> {
fn new(stats: &'a RwLock<solana_rpc_client::rpc_sender::RpcTransportStats>) -> Self {
Self {
stats,
request_start_time: Instant::now(),
rate_limited_time: Duration::default(),
}
}

fn add_rate_limited_time(&mut self, duration: Duration) {
self.rate_limited_time += duration;
}
}

impl<'a> Drop for StatsUpdater<'a> {
fn drop(&mut self) {
let mut stats = self.stats.write().unwrap();
stats.request_count += 1;
stats.elapsed_time += Instant::now().duration_since(self.request_start_time);
stats.rate_limited_time += self.rate_limited_time;
}
}

#[async_trait]
impl RpcSender for HttpSender {
fn get_transport_stats(&self) -> solana_rpc_client::rpc_sender::RpcTransportStats {
self.stats.read().unwrap().clone()
}

async fn send(
&self,
request: RpcRequest,
params: serde_json::Value,
) -> Result<serde_json::Value> {
let mut stats_updater = StatsUpdater::new(&self.stats);

let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
let request_json = request.build_request_json(request_id, params).to_string();

let mut too_many_requests_retries = 5;
loop {
let response = {
let client = self.client.clone();
let request_json = request_json.clone();
client
.post(&self.url)
.header(CONTENT_TYPE, "application/json")
.body(request_json)
.send()
.await
}?;

if !response.status().is_success() {
if response.status() == StatusCode::TOO_MANY_REQUESTS
&& too_many_requests_retries > 0
{
let mut duration = Duration::from_millis(500);
if let Some(retry_after) = response.headers().get(RETRY_AFTER) {
if let Ok(retry_after) = retry_after.to_str() {
if let Ok(retry_after) = retry_after.parse::<u64>() {
if retry_after < 120 {
duration = Duration::from_secs(retry_after);
}
}
}
}

too_many_requests_retries -= 1;
debug!(
"Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
response, too_many_requests_retries, duration
);

sleep(duration).await;
stats_updater.add_rate_limited_time(duration);
continue;
}
return Err(response.error_for_status().unwrap_err().into());
}

let mut json = response.json::<serde_json::Value>().await?;
if json["error"].is_object() {
return match serde_json::from_value::<RpcErrorObject>(json["error"].clone()) {
Ok(rpc_error_object) => {
let data = match rpc_error_object.code {
solana_rpc_client_api::custom_error::JSON_RPC_SERVER_ERROR_SEND_TRANSACTION_PREFLIGHT_FAILURE => {
match serde_json::from_value::<RpcSimulateTransactionResult>(json["error"]["data"].clone()) {
Ok(data) => RpcResponseErrorData::SendTransactionPreflightFailure(data),
Err(err) => {
debug!("Failed to deserialize RpcSimulateTransactionResult: {:?}", err);
RpcResponseErrorData::Empty
}
}
},
custom_error::JSON_RPC_SERVER_ERROR_NODE_UNHEALTHY => {
match serde_json::from_value::<custom_error::NodeUnhealthyErrorData>(json["error"]["data"].clone()) {
Ok(custom_error::NodeUnhealthyErrorData {num_slots_behind}) => RpcResponseErrorData::NodeUnhealthy {num_slots_behind},
Err(_err) => {
RpcResponseErrorData::Empty
}
}
},
_ => RpcResponseErrorData::Empty
};

Err(RpcError::RpcResponseError {
code: rpc_error_object.code,
message: rpc_error_object.message,
data,
}
.into())
}
Err(err) => Err(RpcError::RpcRequestError(format!(
"Failed to deserialize RPC error response: {} [{}]",
serde_json::to_string(&json["error"]).unwrap(),
err
))
.into()),
};
}
return Ok(json["result"].take());
}
}

fn url(&self) -> String {
self.url.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test(flavor = "multi_thread")]
async fn http_sender_on_tokio_multi_thread() {
let http_sender = HttpSender::new("http://localhost:1234".to_string());
let _ = http_sender
.send(RpcRequest::GetTipAccounts, serde_json::Value::Null)
.await;
}

#[tokio::test(flavor = "current_thread")]
async fn http_sender_on_tokio_current_thread() {
let http_sender = HttpSender::new("http://localhost:1234".to_string());
let _ = http_sender
.send(RpcRequest::GetTipAccounts, serde_json::Value::Null)
.await;
}
}
5 changes: 5 additions & 0 deletions json_rpc_client/src/jsonrpc_client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod client_error;
pub mod http_sender;
pub mod request;
pub mod rpc_client;
pub mod rpc_sender;
Loading

0 comments on commit f9b97ae

Please sign in to comment.