Skip to content

Commit

Permalink
update: cleaning code + more sensible comments
Browse files Browse the repository at this point in the history
  • Loading branch information
erikziyunchi committed Nov 5, 2023
1 parent f2f1d72 commit fdec4f1
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 231 deletions.
68 changes: 38 additions & 30 deletions crates/water/src/runtime/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,35 @@ impl WATERClient {
self.debug = debug;
}

pub fn connect(&mut self, addr: &str, port: u16) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient connecting ...");

match &mut self.stream {
WATERClientType::Dialer(dialer) => {
dialer.connect(&self.config, addr, port)?;
}
_ => {
return Err(anyhow::anyhow!("[HOST] This client is not a Dialer"));
}
}
Ok(())
}

pub fn listen(&mut self, addr: &str, port: u16) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient listening ...");

match &mut self.stream {
WATERClientType::Listener(listener) => {
listener.listen(&self.config, addr, port)?;
}
_ => {
return Err(anyhow::anyhow!("[HOST] This client is not a Listener"));
}
}
Ok(())
}

// this will start a worker(WATM) in a separate thread -- returns a JoinHandle
pub fn run_worker(
&mut self,
) -> Result<std::thread::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
Expand All @@ -72,6 +101,7 @@ impl WATERClient {
}
}

// this will run the extry_fn(WATM) in the current thread -- replace Host when running
pub fn execute(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient Executing ...");

Expand All @@ -89,6 +119,7 @@ impl WATERClient {
Ok(())
}

// v0 func for Host to set pipe for canceling later
pub fn cancel_with(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient cancel_with ...");

Expand All @@ -97,12 +128,14 @@ impl WATERClient {
dialer.cancel_with(&self.config)?;
}
_ => {
return Err(anyhow::anyhow!("This client is not a Dialer"));
// for now this is only implemented for v0 dialer
return Err(anyhow::anyhow!("This client is not a v0 Dialer"));
}
}
Ok(())
}

// v0 func for Host to terminate the separate thread running worker(WATM)
pub fn cancel(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient canceling ...");

Expand All @@ -111,41 +144,16 @@ impl WATERClient {
dialer.cancel(&self.config)?;
}
_ => {
return Err(anyhow::anyhow!("This client is not a Dialer"));
}
}
Ok(())
}

pub fn connect(&mut self, addr: &str, port: u16) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient connecting ...");

match &mut self.stream {
WATERClientType::Dialer(dialer) => {
dialer.connect(&self.config, addr, port)?;
}
_ => {
return Err(anyhow::anyhow!("This client is not a listener"));
}
}
Ok(())
}

pub fn listen(&mut self, addr: &str, port: u16) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient listening ...");

match &mut self.stream {
WATERClientType::Listener(listener) => {
listener.listen(&self.config, addr, port)?;
}
_ => {
return Err(anyhow::anyhow!("This client is not a listener"));
// for now this is only implemented for v0 dialer
return Err(anyhow::anyhow!("This client is not a v0 Dialer"));
}
}
Ok(())
}

pub fn read(&mut self, buf: &mut Vec<u8>) -> Result<i64, anyhow::Error> {
info!("[HOST] WATERClient reading ...");

let read_bytes = match &mut self.stream {
WATERClientType::Dialer(dialer) => dialer.read(buf)?,
WATERClientType::Listener(listener) => listener.read(buf)?,
Expand Down
51 changes: 23 additions & 28 deletions crates/water/src/runtime/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,21 @@ impl H2O<Host> {

let mut error_occured = None;

// Get the version global from WATM
let version = module.exports().find_map(|global| {
info!(
"[HOST] WATERCore finding exported symbols from WASM bin: {:?}",
global.name()
);
match Version::parse(global.name()) {
Some(mut v) => {
info!("[HOST] WATERCore found version: {:?}", v.as_str());
match v {
Version::V0(_) => {
info!("[HOST] WATERCore configuring for V0");
match v.config_v0(conf) {
Ok(v) => Some(v),
Err(e) => {
info!("failed to configure for V0: {}", e);
error_occured = Some(e);
None
}
Version::V0(_) => match v.config_v0(conf) {
Ok(v) => Some(v),
Err(e) => {
info!("[HOST] WATERCore failed to configure for V0: {}", e);
error_occured = Some(e);
None
}
}
_ => Some(v),
},
_ => Some(v), // for now only V0 needs to be configured
}
}
None => None,
Expand All @@ -66,18 +60,20 @@ impl H2O<Host> {
if let Some(e) = error_occured {
return Err(e);
}
return Err(anyhow::Error::msg("WASM module version not found"));
return Err(anyhow::Error::msg("WATM module version not found"));
}

store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().build());

if store.data().preview1_ctx.is_none() {
return Err(anyhow::anyhow!("Failed to retrieve preview1_ctx from Host"));
return Err(anyhow::anyhow!(
"[HOST] WATERCore Failed to retrieve preview1_ctx from Host"
));
}

wasmtime_wasi::add_to_linker(&mut linker, |h: &mut Host| h.preview1_ctx.as_mut().unwrap())?;

// initializing stuff for multithreading
// initializing stuff for multithreading -- currently not used yet (v1+ feature)
#[cfg(feature = "multithread")]
{
store.data_mut().wasi_threads = Some(Arc::new(WasiThreadsCtx::new(
Expand Down Expand Up @@ -117,7 +113,9 @@ impl H2O<Host> {
}

// export functions -- version independent
version_common::funcs::export_config(&mut linker, conf.config_wasm.clone())?;
{
version_common::funcs::export_config(&mut linker, conf.config_wasm.clone())?;
}

let instance = linker.instantiate(&mut store, &module)?;

Expand All @@ -138,14 +136,13 @@ impl H2O<Host> {
}

pub fn _prepare(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> {
// NOTE: version has been checked at the very beginning
self._init(conf.debug)?;
self._process_config(conf)?;
self._process_config(conf)?; // This is for now needed only by v1_preview
Ok(())
}

pub fn _init(&mut self, debug: bool) -> Result<(), anyhow::Error> {
info!("[HOST] WATERCore H2O calling _init from WASM...");
info!("[HOST] WATERCore calling _init from WASM...");

let store_lock_result = self.store.lock();

Expand All @@ -171,7 +168,7 @@ impl H2O<Host> {
}

pub fn _process_config(&mut self, config: &WATERConfig) -> Result<(), anyhow::Error> {
info!("[HOST] WATERCore H2O calling _process_config from WASM...");
info!("[HOST] WATERCore calling _process_config from WASM...");

let store_lock_result = self.store.lock();

Expand All @@ -185,13 +182,9 @@ impl H2O<Host> {
Some(func) => func,
None => {
// Currently not going to return error, where V0 don't need config;
// TODO: also remove this function, where config will be pulled from WASM
// NOTE: remove this function for v1_preview as well, where config will be pulled from WASM
info!("config function not found -- skipping");
return Ok(());

// return Err(anyhow::Error::msg(
// "_process_config function not found in WASM",
// ))
}
};

Expand All @@ -217,6 +210,8 @@ impl H2O<Host> {
.preview1_ctx
.as_mut()
.ok_or(anyhow::anyhow!("preview1_ctx in Store is None"))?;

// push the config file into WATM
let config_fd = ctx.push_file(Box::new(wasi_file), FileAccessMode::all())? as i32;

let params = vec![Val::I32(config_fd); config_fn.ty(&*store).params().len()];
Expand Down
Loading

0 comments on commit fdec4f1

Please sign in to comment.