Skip to content

Commit

Permalink
feat(rpc): implement wasmtime_rpc::link_{item,instance}
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Jun 4, 2024
1 parent 9cea5e6 commit da81f96
Show file tree
Hide file tree
Showing 7 changed files with 591 additions and 52 deletions.
102 changes: 98 additions & 4 deletions crates/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ use futures::TryStreamExt as _;
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
use tokio::try_join;
use tokio_util::codec::Encoder;
use tracing::{instrument, warn};
use tracing::{debug, instrument, trace, warn};
use wasm_tokio::cm::AsyncReadValue as _;
use wasm_tokio::{
AsyncReadCore as _, AsyncReadLeb128 as _, AsyncReadUtf8 as _, CoreStringEncoder, Leb128Encoder,
Utf8Encoder,
};
use wasmtime::component::types::{self, Case, Field};
use wasmtime::component::{LinkerInstance, Type, Val};
use wasmtime::{AsContextMut, StoreContextMut};
use wasmtime::{AsContextMut, Engine, StoreContextMut};
use wasmtime_wasi::WasiView;
use wrpc_transport::{Index as _, Invocation, Invoke, Session};

Expand All @@ -44,6 +44,7 @@ impl<T, W> ValEncoder<'_, T, W> {
}
}

#[must_use]
pub fn with_type<'a>(&'a mut self, ty: &'a Type) -> ValEncoder<'a, T, W> {
ValEncoder {
store: self.store.as_context_mut(),
Expand Down Expand Up @@ -446,7 +447,9 @@ async fn read_flags(n: usize, r: &mut (impl AsyncRead + Unpin)) -> std::io::Resu
Ok(u128::from_le_bytes(buf))
}

pub async fn read_value<T, R>(
/// Read encoded value of type [`Type`] from an [`AsyncRead`] into a [`Val`]
#[instrument(level = "trace", skip_all, fields(ty, path))]
async fn read_value<T, R>(
store: &mut impl AsContextMut<Data = T>,
r: &mut Pin<&mut R>,
val: &mut Val,
Expand Down Expand Up @@ -534,6 +537,7 @@ where
for i in 0..n {
let mut v = Val::Bool(false);
path.push(i);
trace!(i, "reading list element value");
Box::pin(read_value(store, r, &mut v, &ty, &path)).await?;
path.pop();
vs.push(v);
Expand All @@ -548,6 +552,7 @@ where
for (i, Field { name, ty }) in fields.enumerate() {
let mut v = Val::Bool(false);
path.push(i);
trace!(i, "reading struct field value");
Box::pin(read_value(store, r, &mut v, &ty, &path)).await?;
path.pop();
vs.push((name.to_string(), v));
Expand All @@ -562,6 +567,7 @@ where
for (i, ty) in types.enumerate() {
let mut v = Val::Bool(false);
path.push(i);
trace!(i, "reading tuple element value");
Box::pin(read_value(store, r, &mut v, &ty, &path)).await?;
path.pop();
vs.push(v);
Expand All @@ -583,6 +589,7 @@ where
let name = name.to_string();
if let Some(ty) = ty {
let mut v = Val::Bool(false);
trace!(variant = name, "reading nested variant value");
Box::pin(read_value(store, r, &mut v, &ty, path)).await?;
*val = Val::Variant(name, Some(Box::new(v)));
} else {
Expand All @@ -608,6 +615,7 @@ where
let ok = r.read_option_status().await?;
if ok {
let mut v = Val::Bool(false);
trace!("reading nested `option::some` value");
Box::pin(read_value(store, r, &mut v, &ty.ty(), path)).await?;
*val = Val::Option(Some(Box::new(v)));
} else {
Expand All @@ -620,13 +628,15 @@ where
if ok {
if let Some(ty) = ty.ok() {
let mut v = Val::Bool(false);
trace!("reading nested `result::ok` value");
Box::pin(read_value(store, r, &mut v, &ty, path)).await?;
*val = Val::Result(Ok(Some(Box::new(v))));
} else {
*val = Val::Result(Ok(None));
}
} else if let Some(ty) = ty.err() {
let mut v = Val::Bool(false);
trace!("reading nested `result::err` value");
Box::pin(read_value(store, r, &mut v, &ty, path)).await?;
*val = Val::Result(Err(Some(Box::new(v))));
} else {
Expand Down Expand Up @@ -696,6 +706,90 @@ pub trait WrpcView<C: Invoke>: Send {
fn client(&self) -> &C;
}

/// Polyfill [`types::ComponentItem`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
#[instrument(level = "trace", skip_all)]
pub fn link_item<'a, C, V>(
engine: &Engine,
linker: &mut LinkerInstance<V>,
ty: types::ComponentItem,
instance: impl Into<Arc<str>>,
name: impl Into<Arc<str>>,
cx: C::Context,
) -> wasmtime::Result<()>
where
V: WrpcView<C> + WasiView,
C: Invoke,
C::Error: Into<wasmtime::Error>,
C::Context: Clone + 'static,
<C::Session as Session>::TransportError: Into<wasmtime::Error>,
<C::Outgoing as wrpc_transport::Index<C::NestedOutgoing>>::Error: Into<wasmtime::Error>,
C::NestedOutgoing: 'static,
<C::NestedOutgoing as wrpc_transport::Index<C::NestedOutgoing>>::Error: Into<wasmtime::Error>,
C::Incoming: Unpin + Sized + 'static,
<C::Incoming as wrpc_transport::Index<C::Incoming>>::Error:
Into<Box<dyn std::error::Error + Send + Sync>>,
{
let instance = instance.into();
match ty {
types::ComponentItem::ComponentFunc(ty) => {
let name = name.into();
debug!(?instance, ?name, "linking function");
link_function(linker, ty, instance, name, cx)?
}
types::ComponentItem::CoreFunc(_) => {
bail!("polyfilling core functions not supported yet")
}
types::ComponentItem::Module(_) => bail!("polyfilling modules not supported yet"),
types::ComponentItem::Component(ty) => {
for (name, ty) in ty.imports(&engine) {
debug!(?instance, name, "linking component item");
link_item(engine, linker, ty, "", name, cx.clone())?;
}
}
types::ComponentItem::ComponentInstance(ty) => {
let name = name.into();
let mut linker = linker
.instance(&name)
.with_context(|| format!("failed to instantiate `{name}` in the linker"))?;
debug!(?instance, ?name, "linking instance");
link_instance(engine, &mut linker, ty, name, cx)?
}
types::ComponentItem::Type(_) => {}
types::ComponentItem::Resource(_) => bail!("polyfilling resources not supported yet"),
}
Ok(())
}

/// Polyfill [`types::ComponentInstance`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
#[instrument(level = "trace", skip_all)]
pub fn link_instance<'a, C, V>(
engine: &Engine,
linker: &mut LinkerInstance<V>,
ty: types::ComponentInstance,
name: impl Into<Arc<str>>,
cx: C::Context,
) -> wasmtime::Result<()>
where
V: WrpcView<C> + WasiView,
C: Invoke,
C::Error: Into<wasmtime::Error>,
C::Context: Clone + 'static,
<C::Session as Session>::TransportError: Into<wasmtime::Error>,
<C::Outgoing as wrpc_transport::Index<C::NestedOutgoing>>::Error: Into<wasmtime::Error>,
C::NestedOutgoing: 'static,
<C::NestedOutgoing as wrpc_transport::Index<C::NestedOutgoing>>::Error: Into<wasmtime::Error>,
C::Incoming: Unpin + Sized + 'static,
<C::Incoming as wrpc_transport::Index<C::Incoming>>::Error:
Into<Box<dyn std::error::Error + Send + Sync>>,
{
let instance = name.into();
for (name, ty) in ty.exports(&engine) {
debug!(name, "linking instance item");
link_item(engine, linker, ty, Arc::clone(&instance), name, cx.clone())?
}
Ok(())
}

/// Polyfill [`types::ComponentFunc`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
#[instrument(level = "trace", skip_all)]
pub fn link_function<'a, C, V>(
Expand Down Expand Up @@ -769,7 +863,7 @@ where
for (i, (v, ref ty)) in zip(results, ty.results()).enumerate() {
read_value(&mut store, &mut incoming, v, ty, &[i])
.await
.context("failed to decode result value")?;
.with_context(|| format!("failed to decode return value {i}"))?;
}
Ok(())
},
Expand Down
Loading

0 comments on commit da81f96

Please sign in to comment.