Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multi-backend async support #66

Merged
merged 12 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions sube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,66 @@ repository = "https://github.com/valibre-org/virto-dk/sube"
async-once-cell = "0.4.4"
blake2 = { version = "0.10.5", default-features = false }
codec = { version = "3.1.2", package = "parity-scale-codec", default-features = false }
frame-metadata = { version = "16.0.0", default-features = false, features = ["serde_full", "decode"] }
frame-metadata = { version = "16.0.0", default-features = false, features = [
"serde_full",
"decode",
] }
hex = { version = "0.4.3", default-features = false, features = ["alloc"] }
jsonrpc = { version = "0.12.1", default-features = false, optional = true }
log = "0.4.17"
scale-info = { version = "2.1.1", default-features = false, optional = true }
scales = { path="../scales", package = "scale-serialization", default-features = false, features = ["codec", "experimental-serializer", "json", "std"] }
scales = { path = "../scales", package = "scale-serialization", default-features = false, features = [
"codec",
"experimental-serializer",
"json",
"std",
] }
serde = { version = "1.0.137", default-features = false }
# TODO: shouldn't be a base dependeny. remove after: https://github.com/virto-network/virto-sdk/issues/53
serde_json = { version = "1.0.80", default-features = false, features = ["alloc", "arbitrary_precision"] }
serde_json = { version = "1.0.80", default-features = false, features = [
"alloc",
"arbitrary_precision",
] }
twox-hash = { version = "1.6.2", default-features = false }
url = "2.5.0"

# http backend
reqwest = { version = "0.12.5", optional = true, features = ["json"]}

# ws backend
futures-channel = { version = "0.3.21", default-features = false, features = ["alloc"], optional = true }
futures-util = { version = "0.3.21", default-features = false, features = ["sink"], optional = true }
async-mutex = { version = "1.4.0", optional = true }
futures-channel = { version = "0.3.21", default-features = false, features = [
"alloc",
], optional = true }
futures-util = { version = "0.3.21", default-features = false, features = [
"sink",
], optional = true }

async-tls = { version = "0.11.0", default-features = false, optional = true }

# bin target
async-std = { version = "1.11.0", optional = true }
paste = { version = "1.0" }
wasm-bindgen = { version = "0.2.91", optional = true }
wasm-bindgen = { version = "0.2.92", optional = true }
once_cell = { version = "1.17.1", optional = true }
heapless = { version = "0.7.16", optional = true }
heapless = "0.8.0"
anyhow = { version = "1.0.40", optional = true }
rand_core = {version = "0.6.3", optional = true }
rand_core = { version = "0.6.3", optional = true }
ewebsock = { git = "https://github.com/S0c5/ewebsock.git", optional = true, branch = "enhacement/aviod-blocking-operations-with-mpsc-futures" }
env_logger = "0.11.3"
no-std-async = "1.1.2"


[dev-dependencies]
async-std = { version = "1.11.0", features = ["attributes", "tokio1"] }
hex-literal = "0.3.4"
libwallet = { path = "../libwallet", default-features=false, features=["substrate", "mnemonic", "sr25519", "util_pin", "rand", "std" ] }
libwallet = { path = "../libwallet", default-features = false, features = [
"substrate",
"mnemonic",
"sr25519",
"util_pin",
"rand",
"std",
] }
rand_core = "0.6.3"

[features]
Expand All @@ -58,11 +82,19 @@ json = ["scales/json"]
std = []
no_std = []


v14 = ["dep:scale-info", "frame-metadata/current"]
ws = ["dep:async-mutex", "dep:async-std", "dep:ewebsock", "dep:futures-channel", "dep:futures-util", "dep:jsonrpc", "async-std/unstable"]
ws = [
"dep:async-std",
"dep:ewebsock",
"dep:futures-channel",
"dep:futures-util",
"dep:jsonrpc",
"async-std/unstable",
]
wss = ["dep:async-tls", "ws", "ewebsock/tls", "async-std/unstable"]
examples = ["dep:rand_core"]
js = ["http-web", "json", "v14", 'async-std/unstable', "ws", "dep:rand_core"]
js = ["http-web", "json", "v14", 'async-std/unstable', "wss", "dep:rand_core"]

[package.metadata.docs.rs]
features = ["http"]
Expand All @@ -71,4 +103,4 @@ features = ["http"]
members = [
"sube-js",
"cli"
]
]
32 changes: 32 additions & 0 deletions sube/examples/query_identity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use core::future::{Future, IntoFuture};
use serde::{Deserialize, Serialize};
use serde_json::{from_value, Value};
use sube::{sube, Response};

#[async_std::main]
async fn main() -> sube::Result<()> {
env_logger::init();

let result = sube!("ws://localhost:11004/identity/superOf/0x6d6f646c6b762f636d7479738501000000000000000000000000000000000000").await?;

if let Response::Value(value) = result {
let data = serde_json::to_value(&value).expect("to be serializable");
println!(
"Account info: {}",
serde_json::to_string_pretty(&data).expect("it must return an str")
);
}

let query = format!("ws://localhost:11004/identity/identityOf/0xbe6ed76ac48d5c7f1c5d2cab8a1d1e7a451dcc24b624b088ef554fd47ba21139");

let r = sube!(&query).await?;

if let Response::Value(ref v) = r {
let json_value = serde_json::to_value(v).expect("to be serializable");
println!("json: {:?}", json_value);
let x = serde_json::to_string_pretty(&json_value).expect("it must return an str");
println!("Account info: {:?}", x);
}

Ok(())
}
99 changes: 72 additions & 27 deletions sube/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,9 @@ impl<'a> SubeBuilder<'a, (), ()> {
let url = chain_string_to_url(url.ok_or(Error::BadInput)?)?;
let path = url.path();

log::info!("building the backend for {}", url);
log::trace!("building the backend for {}", url);

let backend = BACKEND
.get_or_try_init(get_backend_by_url(url.clone()))
.await?;

let meta = META
.get_or_try_init(async {
match metadata {
Some(m) => Ok(m),
None => backend.metadata().await.map_err(|_| Error::BadMetadata),
}
})
.await?;
let (backend, meta) = get_multi_backend_by_url(url.clone(), metadata).await?;

Ok(match path {
"_meta" => Response::Meta(meta),
Expand Down Expand Up @@ -125,18 +114,7 @@ where
let path = url.path();
let body = body.ok_or(Error::BadInput)?;

let backend = BACKEND
.get_or_try_init(get_backend_by_url(url.clone()))
.await?;

let meta = META
.get_or_try_init(async {
match metadata {
Some(m) => Ok(m),
None => backend.metadata().await.map_err(|_| Error::BadMetadata),
}
})
.await?;
let (backend, meta) = get_multi_backend_by_url(url.clone(), metadata).await?;

Ok(match path {
"_meta" => Response::Meta(meta),
Expand All @@ -150,8 +128,75 @@ where
}
}

static BACKEND: async_once_cell::OnceCell<AnyBackend> = async_once_cell::OnceCell::new();
static META: async_once_cell::OnceCell<Metadata> = async_once_cell::OnceCell::new();
use heapless::FnvIndexMap as Map;
use no_std_async::Mutex;

static INSTANCE_BACKEND: async_once_cell::OnceCell<
Mutex<Map<String, Mutex<&'static AnyBackend>, 16>>,
> = async_once_cell::OnceCell::new();

static INSTANCE_METADATA: async_once_cell::OnceCell<
Mutex<Map<String, Mutex<&'static Metadata>, 16>>,
> = async_once_cell::OnceCell::new();

async fn get_metadata(backend: &AnyBackend, metadata: Option<Metadata>) -> SubeResult<Metadata> {
match metadata {
Some(m) => Ok(m),
None => backend.metadata().await.map_err(|_| Error::BadMetadata),
}
}

async fn get_multi_backend_by_url<'a>(
url: Url,
metadata: Option<Metadata>,
) -> SubeResult<(&'a AnyBackend, &'a Metadata)> {
let mut instance_backend = INSTANCE_BACKEND
.get_or_init(async { Mutex::new(Map::new()) })
.await
.lock()
.await;

let mut instance_metadata = INSTANCE_METADATA
.get_or_init(async { Mutex::new(Map::new()) })
.await
.lock()
.await;

let base_path = format!(
"{}://{}:{}",
url.scheme(),
url.host_str().expect("url to have a host"),
url.port().unwrap_or(80)
);

let cached_b = instance_backend.get(&base_path);
let cached_m = instance_metadata.get(&base_path);

match (cached_b, cached_m) {
(Some(b), Some(m)) => {
let b = *b.lock().await;
let m = *m.lock().await;
Ok((b, m))
}
_ => {
let backend = Box::new(get_backend_by_url(url.clone()).await?);
let backend = Box::leak::<'static>(backend);

instance_backend
.insert(base_path.clone(), Mutex::new(backend))
.map_err(|_| Error::CantInitBackend)?;

let metadata = Box::new(get_metadata(backend, metadata).await?);
let metadata = Box::leak::<'static>(metadata);

instance_metadata
.insert(base_path.clone(), Mutex::new(metadata))
.map_err(|_| Error::BadMetadata)?;

Ok((backend, metadata))
}
}
}

pub type BoxFuture<'a, T> = core::pin::Pin<Box<dyn Future<Output = T> + 'a>>;

Expand Down
8 changes: 6 additions & 2 deletions sube/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mod prelude {
}

/// Surf based backend
#[cfg(any(feature = "http", feature = "http-web", feature = "js"))]
#[cfg(any(feature = "http", feature = "http-web"))]
pub mod http;
/// Tungstenite based backend
#[cfg(feature = "ws")]
Expand All @@ -58,7 +58,7 @@ mod hasher;
mod meta_ext;
mod signer;

#[cfg(any(feature = "http", feature = "http-web", feature = "ws", feature = "js"))]
#[cfg(any(feature = "http", feature = "http-web", feature = "ws"))]
pub mod rpc;
pub mod util;

Expand Down Expand Up @@ -488,6 +488,10 @@ pub enum Error {
AccountNotFound,
ConstantNotFound(String),
Platform(String),
CantInitBackend,
CantDecodeReponseForMeta,
CantDecodeRawQueryResponse,
CantFindMethodInPallet,
}

impl fmt::Display for Error {
Expand Down
2 changes: 1 addition & 1 deletion sube/src/meta_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl StorageKey {
.storage
.as_ref()
.and_then(|s| s.entries.iter().find(|e| e.name == item))
.ok_or(crate::Error::StorageKeyNotFound)?;
.ok_or(crate::Error::CantFindMethodInPallet)?;
log::trace!("map_keys={}", map_keys.iter().map(|x| x.as_ref()).collect::<Vec<&str>>().join(", "));
entry.ty.key(registry, &meta.name, &entry.name, map_keys)
}
Expand Down
11 changes: 6 additions & 5 deletions sube/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<R: Rpc> Backend for RpcClient<R> {
)
.await
.map_err(|err| {
log::info!("error {:?}", err);
log::error!("error state_queryStorageAt {:?}", err);
crate::Error::StorageKeyNotFound
})
}
Expand All @@ -75,7 +75,7 @@ impl<R: Rpc> Backend for RpcClient<R> {
)
.await
.map_err(|err| {
log::info!("error {:?}", err);
log::error!("error paged {:?}", err);
crate::Error::StorageKeyNotFound
})?;
log::info!("rpc call {:?}", r);
Expand All @@ -91,12 +91,12 @@ impl<R: Rpc> Backend for RpcClient<R> {
.rpc("state_getStorage", &[&format!("\"{}\"", &key)])
.await
.map_err(|e| {
log::debug!("RPC failure: {}", e);
log::error!("RPC failure: {}", e);
// NOTE it could fail for more reasons
crate::Error::StorageKeyNotFound
})?;

let response = hex::decode(&res[2..]).map_err(|_err| crate::Error::StorageKeyNotFound)?;
let response = hex::decode(&res[2..]).map_err(|_err| crate::Error::CantDecodeRawQueryResponse)?;

Ok(response)
}
Expand All @@ -109,6 +109,7 @@ impl<R: Rpc> Backend for RpcClient<R> {
.rpc::<serde_json::Value>("author_submitExtrinsic", &[&format!("\"{}\"", &extrinsic)])
.await
.map_err(|e| crate::Error::Node(e.to_string()))?;

Ok(())
}

Expand All @@ -118,7 +119,7 @@ impl<R: Rpc> Backend for RpcClient<R> {
.rpc("state_getMetadata", &[])
.await
.map_err(|e| crate::Error::Node(e.to_string()))?;
let response = hex::decode(&res[2..]).map_err(|_err| crate::Error::StorageKeyNotFound)?;
let response = hex::decode(&res[2..]).map_err(|_err| crate::Error::CantDecodeReponseForMeta)?;
let meta = from_bytes(&mut response.as_slice()).map_err(|_| crate::Error::BadMetadata)?;
log::trace!("Metadata {:#?}", meta);
Ok(meta)
Expand Down
18 changes: 13 additions & 5 deletions sube/src/ws.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloc::{collections::BTreeMap, sync::Arc};

use async_mutex::Mutex;
use no_std_async::Mutex;
use ewebsock::{WsEvent, WsMessage as Message, WsReceiver as Rx, WsSender as Tx};
use futures_channel::{mpsc, oneshot};
use futures_util::StreamExt as _;
Expand All @@ -22,6 +22,8 @@ use crate::{
Error,
};

const MAX_BUFFER: usize = usize::MAX >> 3;

type Id = u32;

pub struct Backend {
Expand Down Expand Up @@ -60,13 +62,19 @@ impl Rpc for Backend {
.lock()
.await
.try_send(Message::Text(msg))
.map_err(|_| standard_error(StandardError::InternalError, None))?;
.map_err(|err| {
log::error!("Error tx lock message: {:?}", err);
standard_error(StandardError::InternalError, None)
})?;

log::info!("sent CMD");
// wait for the matching response to arrive
let res = recv
.await
.map_err(|_| standard_error(StandardError::InternalError, None))?
.map_err(|err| {
log::error!("Error receiving message: {:?}", err);
standard_error(StandardError::InternalError, None)
})?
.result()?;

Ok(res)
Expand All @@ -84,8 +92,8 @@ impl Backend {

let (tx, rx) =
ewebsock::connect(url, ewebsock::Options::default()).map_err(Error::Platform)?;

let (sender, recv) = mpsc::channel::<Message>(0);
let (sender, recv) = mpsc::channel::<Message>(MAX_BUFFER);

let backend = Backend {
tx: Mutex::new(sender),
Expand Down
Loading