diff --git a/crates/wasm/src/config.rs b/crates/wasm/src/config.rs index 3e4701d..b16d87a 100644 --- a/crates/wasm/src/config.rs +++ b/crates/wasm/src/config.rs @@ -3,10 +3,11 @@ use super::*; // A Config currently contains the local + remote ip & port #[derive(Debug, Deserialize, Clone)] pub struct Config { - pub local_address: String, - pub local_port: u32, pub remote_address: String, pub remote_port: u32, + pub local_address: String, + pub local_port: u32, + pub bypass: bool, } impl Default for Config { @@ -19,10 +20,11 @@ impl Default for Config { impl Config { pub fn new() -> Self { Config { - local_address: String::from("127.0.0.1"), - local_port: 8080, remote_address: String::from("example.com"), remote_port: 8082, + local_address: String::from("127.0.0.1"), + local_port: 8080, + bypass: false, } } } diff --git a/crates/wasm/src/version.rs b/crates/wasm/src/version.rs index 5cfb839..39236cd 100644 --- a/crates/wasm/src/version.rs +++ b/crates/wasm/src/version.rs @@ -1,3 +1,3 @@ // must have something like this in your WASM module, the following is just an example -// #[export_name = "V0"] +// #[export_name = "_water_v0"] // pub static V0: i32 = 0; diff --git a/crates/water/Cargo.toml b/crates/water/Cargo.toml index 3d72f99..1f4c646 100644 --- a/crates/water/Cargo.toml +++ b/crates/water/Cargo.toml @@ -30,3 +30,4 @@ bincode = "1.3" rustls = "0.20.6" rustls-pemfile = "1.0.0" zeroize = { version = "1.5.4", features = ["alloc"] } +serde_json = "1.0.107" \ No newline at end of file diff --git a/crates/water/src/config/mod.rs b/crates/water/src/config/mod.rs index 7bd0345..d5630f4 100644 --- a/crates/water/src/config/mod.rs +++ b/crates/water/src/config/mod.rs @@ -5,6 +5,7 @@ pub struct WATERConfig { pub entry_fn: String, pub config_wasm: String, pub client_type: WaterBinType, + pub debug: bool, } diff --git a/crates/water/src/globals.rs b/crates/water/src/globals.rs index c704154..3c9220f 100644 --- a/crates/water/src/globals.rs +++ b/crates/water/src/globals.rs @@ -3,20 +3,17 @@ pub const WASM_PATH: &str = "./proxy.wasm"; pub const CONFIG_WASM_PATH: &str = "./conf.json"; -const ALLOC_FN: &str = "alloc"; -const MEMORY: &str = "memory"; -const DEALLOC_FN: &str = "dealloc"; - pub const MAIN: &str = "main"; -pub const VERSION_FN: &str = "_version"; -pub const INIT_FN: &str = "_init"; -pub const CONFIG_FN: &str = "_config"; -pub const USER_READ_FN: &str = "_user_will_read"; -pub const WRITE_DONE_FN: &str = "_user_write_done"; -pub const WATER_BRIDGING_FN: &str = "_set_inbound"; -pub const READER_FN: &str = "_read"; -pub const WRITER_FN: &str = "_write"; -pub const DIAL_FN: &str = "_dial"; +pub const VERSION_FN: &str = "_water_version"; +pub const INIT_FN: &str = "_water_init"; +pub const CONFIG_FN: &str = "_water_config"; +pub const WATER_BRIDGING_FN: &str = "_water_set_inbound"; +pub const READER_FN: &str = "_water_read"; +pub const WRITER_FN: &str = "_water_write"; +pub const ACCEPT_FN: &str = "_water_accept"; +pub const DIAL_FN: &str = "_water_dial"; +pub const ASSOCIATE_FN: &str = "_water_associate"; +pub const CANCEL_FN: &str = "_water_cancel_with"; pub const RUNTIME_VERSION_MAJOR: i32 = 0x001aaaaa; pub const RUNTIME_VERSION: &str = "v0.1-alpha"; diff --git a/crates/water/src/runtime/client.rs b/crates/water/src/runtime/client.rs new file mode 100644 index 0000000..91cc23c --- /dev/null +++ b/crates/water/src/runtime/client.rs @@ -0,0 +1,270 @@ +use crate::runtime::*; +use listener::WATERListenerTrait; +use relay::WATERRelayTrait; +use stream::WATERStreamTrait; + +// =================== WATERClient Definition =================== +pub enum WATERClientType { + Dialer(Box), + Listener(Box), + Relay(Box), + Runner(WATERRunner), // This is a customized runner -- not like any stream +} + +pub struct WATERClient { + debug: bool, + + pub config: WATERConfig, + pub stream: WATERClientType, +} + +impl WATERClient { + pub fn new(conf: WATERConfig) -> Result { + info!("[HOST] WATERClient initializing ..."); + + let mut core = H2O::init(&conf)?; + core._prepare(&conf)?; + + // client_type: 0 -> Dialer, 1 -> Listener, 2 -> Runner + let water = match conf.client_type { + WaterBinType::Dial => { + let stream = match core.version { + Version::V0(_) => Box::new(v0::stream::WATERStream::init(&conf, core)?) + as Box, + Version::V1 => Box::new(v1::stream::WATERStream::init(&conf, core)?) + as Box, + _ => { + return Err(anyhow::anyhow!("Invalid version")); + } + }; + + WATERClientType::Dialer(stream) + } + WaterBinType::Listen => { + let listener = match core.version { + Version::V0(_) => Box::new(v0::listener::WATERListener::init(&conf, core)?) + as Box, + Version::V1 => Box::new(v1::listener::WATERListener::init(&conf, core)?) + as Box, + _ => { + return Err(anyhow::anyhow!("Invalid version")); + } + }; + + WATERClientType::Listener(listener) + } + WaterBinType::Relay => { + // host managed relay is only implemented for v0 + let relay = match core.version { + Version::V0(_) => Box::new(v0::relay::WATERRelay::init(&conf, core)?) + as Box, + _ => { + return Err(anyhow::anyhow!("Invalid version")); + } + }; + + WATERClientType::Relay(relay) + } + WaterBinType::Runner => { + let runner = WATERRunner::init(&conf, core)?; + WATERClientType::Runner(runner) + } + _ => { + return Err(anyhow::anyhow!("Invalid client type")); + } + }; + + Ok(WATERClient { + config: conf, + debug: false, + stream: water, + }) + } + + pub fn set_debug(&mut self, debug: bool) { + self.debug = debug; + } + + pub fn connect(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient connecting ..."); + + match &mut self.stream { + WATERClientType::Dialer(dialer) => { + dialer.connect(&self.config)?; + } + _ => { + return Err(anyhow::anyhow!("[HOST] This client is not a Dialer")); + } + } + Ok(()) + } + + pub fn listen(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient listening ..."); + + match &mut self.stream { + WATERClientType::Listener(listener) => { + listener.listen(&self.config)?; + } + _ => { + return Err(anyhow::anyhow!("[HOST] This client is not a Listener")); + } + } + Ok(()) + } + + pub fn relay(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient relaying ..."); + + match &mut self.stream { + WATERClientType::Relay(relay) => { + relay.relay(&self.config)?; + } + _ => { + return Err(anyhow::anyhow!("[HOST] This client is not a Relay")); + } + } + Ok(()) + } + + pub fn associate(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient relaying ..."); + + match &mut self.stream { + WATERClientType::Relay(relay) => { + relay.associate(&self.config)?; + } + _ => { + return Err(anyhow::anyhow!("[HOST] This client is not a Relay")); + } + } + Ok(()) + } + + pub fn accept(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient accepting ..."); + + match &mut self.stream { + WATERClientType::Listener(listener) => { + listener.accept(&self.config)?; + } + _ => { + return Err(anyhow::anyhow!("[HOST] This client is not a Listener")); + } + } + Ok(()) + } + + // this will start a worker(WATM) in a separate thread -- returns a JoinHandle + pub fn run_worker( + &mut self, + ) -> Result>, anyhow::Error> { + info!("[HOST] WATERClient run_worker ..."); + + match &mut self.stream { + WATERClientType::Dialer(dialer) => dialer.run_entry_fn(&self.config), + WATERClientType::Listener(listener) => { + // TODO: clone listener here, since we are doing one WATM instance / accept + listener.run_entry_fn(&self.config) + } + WATERClientType::Relay(relay) => relay.run_entry_fn(&self.config), + _ => Err(anyhow::anyhow!("This client is not a Runner")), + } + } + + // this will run the extry_fn(WATM) in the current thread -- replace Host when running + pub fn execute(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient Executing ..."); + + match &mut self.stream { + WATERClientType::Runner(runner) => { + runner.run(&self.config)?; + } + WATERClientType::Dialer(dialer) => { + dialer.run_entry_fn(&self.config)?; + } + WATERClientType::Listener(listener) => { + listener.run_entry_fn(&self.config)?; + } + WATERClientType::Relay(relay) => { + relay.run_entry_fn(&self.config)?; + } + } + Ok(()) + } + + // v0 func for Host to set pipe for canceling later + pub fn cancel_with(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient cancel_with ..."); + + match &mut self.stream { + WATERClientType::Dialer(dialer) => { + dialer.cancel_with(&self.config)?; + } + WATERClientType::Listener(listener) => { + listener.cancel_with(&self.config)?; + } + WATERClientType::Relay(relay) => { + relay.cancel_with(&self.config)?; + } + _ => { + // for now this is only implemented for v0 dialer + return Err(anyhow::anyhow!("This client is not a v0 supported client")); + } + } + Ok(()) + } + + // v0 func for Host to terminate the separate thread running worker(WATM) + pub fn cancel(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient canceling ..."); + + match &mut self.stream { + WATERClientType::Dialer(dialer) => { + dialer.cancel(&self.config)?; + } + WATERClientType::Listener(listener) => { + listener.cancel(&self.config)?; + } + WATERClientType::Relay(relay) => { + relay.cancel(&self.config)?; + } + _ => { + // for now this is only implemented for v0 dialer + return Err(anyhow::anyhow!("This client is not a v0 Dialer")); + } + } + Ok(()) + } + + pub fn read(&mut self, buf: &mut Vec) -> Result { + info!("[HOST] WATERClient reading ..."); + + let read_bytes = match &mut self.stream { + WATERClientType::Dialer(dialer) => dialer.read(buf)?, + WATERClientType::Listener(listener) => listener.read(buf)?, + _ => { + return Err(anyhow::anyhow!("This client is not supporting read")); + } + }; + + Ok(read_bytes) + } + + pub fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient writing ..."); + + match &mut self.stream { + WATERClientType::Dialer(dialer) => { + dialer.write(buf)?; + } + WATERClientType::Listener(listener) => { + listener.write(buf)?; + } + _ => { + return Err(anyhow::anyhow!("This client is not supporting write")); + } + } + Ok(()) + } +} diff --git a/crates/water/src/runtime/core.rs b/crates/water/src/runtime/core.rs index a7630f7..e79b9ac 100644 --- a/crates/water/src/runtime/core.rs +++ b/crates/water/src/runtime/core.rs @@ -1,3 +1,5 @@ +use std::sync::Mutex; + use crate::runtime::*; #[derive(Default, Clone)] @@ -6,13 +8,14 @@ pub struct Host { pub wasi_threads: Option>>, } +#[derive(Clone)] pub struct H2O { pub version: Version, pub engine: Engine, pub linker: Linker, pub instance: Instance, - pub store: Store, + pub store: Arc>>, pub module: Module, } @@ -31,36 +34,47 @@ impl H2O { let host = Host::default(); let mut store = Store::new(&engine, host); + let mut error_occured = None; + + // Get the version global from WATM let version = module.exports().find_map(|global| { - info!( - "[HOST] WATERCore finding exported symbols from WASM bin: {:?}", - global.name() - ); match Version::parse(global.name()) { - Some(v) => { + Some(mut v) => { info!("[HOST] WATERCore found version: {:?}", v.as_str()); - Some(v) + match v { + Version::V0(_) => match v.config_v0(conf) { + Ok(v) => Some(v), + Err(e) => { + info!("[HOST] WATERCore failed to configure for V0: {}", e); + error_occured = Some(e); + None + } + }, + _ => Some(v), // for now only V0 needs to be configured + } } None => None, } }); if version.is_none() { - return Err(anyhow::Error::msg("WASM module version not found")); + if let Some(e) = error_occured { + return Err(e); + } + return Err(anyhow::Error::msg("WATM module version not found")); } - // let path = unsafe { Dir::open_ambient_dir(".", ambient_authority())? }; - - // store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().preopened_dir(path, ".")?.build()); store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().build()); if store.data().preview1_ctx.is_none() { - return Err(anyhow::anyhow!("Failed to retrieve preview1_ctx from Host")); + return Err(anyhow::anyhow!( + "[HOST] WATERCore Failed to retrieve preview1_ctx from Host" + )); } wasmtime_wasi::add_to_linker(&mut linker, |h: &mut Host| h.preview1_ctx.as_mut().unwrap())?; - // initializing stuff for multithreading + // initializing stuff for multithreading -- currently not used yet (v1+ feature) #[cfg(feature = "multithread")] { store.data_mut().wasi_threads = Some(Arc::new(WasiThreadsCtx::new( @@ -77,19 +91,44 @@ impl H2O { // export functions -- version dependent -- has to be done before instantiate match &version { - Some(Version::V0) => { - v0::funcs::export_tcp_connect(&mut linker)?; - v0::funcs::export_tcplistener_create(&mut linker)?; - } + Some(Version::V0(ref config)) => match config { + Some(v0_conf) => { + // let v0_conf = Arc::new(Mutex::new(v0_conf.clone())); + v0::funcs::export_tcp_connect(&mut linker, Arc::clone(v0_conf))?; + v0::funcs::export_accept(&mut linker, Arc::clone(v0_conf))?; + v0::funcs::export_defer(&mut linker, Arc::clone(v0_conf))?; + + // // if client_type is Listen, then create a listener with the same config + // if conf.client_type == WaterBinType::Listen { + // match v0_conf.lock() { + // Ok(mut v0_conf) => { + // v0_conf.create_listener()?; + // } + // Err(e) => { + // return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?; + // } + // } + // } + } + None => { + return Err(anyhow::anyhow!( + "v0_conf wasn't initialized / setup correctly" + ))?; + } + }, Some(Version::V1) => { v1::funcs::export_tcp_connect(&mut linker)?; v1::funcs::export_tcplistener_create(&mut linker)?; } - _ => {} // add export funcs for other versions here + _ => { + unimplemented!("This version is not supported yet") + } // add export funcs for other versions here } // export functions -- version independent - version_common::funcs::export_config(&mut linker, conf.config_wasm.clone())?; + { + version_common::funcs::export_config(&mut linker, conf.config_wasm.clone())?; + } let instance = linker.instantiate(&mut store, &module)?; @@ -104,29 +143,36 @@ impl H2O { engine, linker, instance, - store, + store: Arc::new(Mutex::new(store)), module, }) } pub fn _prepare(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> { - // NOTE: version has been checked at the very beginning self._init(conf.debug)?; - self._process_config(conf)?; + self._process_config(conf)?; // This is for now needed only by v1_preview Ok(()) } pub fn _init(&mut self, debug: bool) -> Result<(), anyhow::Error> { - info!("[HOST] WATERCore H2O calling _init from WASM..."); + info!("[HOST] WATERCore calling _init from WASM..."); + + let store_lock_result = self.store.lock(); + + let mut store = match store_lock_result { + Ok(store) => store, + Err(e) => return Err(anyhow::Error::msg(format!("Failed to lock store: {}", e))), + }; - let init_fn = match self.instance.get_func(&mut self.store, INIT_FN) { + let init_fn = match self.instance.get_func(&mut *store, INIT_FN) { Some(func) => func, None => return Err(anyhow::Error::msg("init function not found")), }; - // TODO: check if we need to pass in any arguments / configs later - let params = vec![Val::I32(debug as i32); init_fn.ty(&self.store).params().len()]; - match init_fn.call(&mut self.store, ¶ms, &mut []) { + // check if we need to pass in any arguments / configs later + let params = vec![Val::I32(debug as i32); init_fn.ty(&*store).params().len()]; + let mut res = vec![Val::I64(0); init_fn.ty(&*store).results().len()]; + match init_fn.call(&mut *store, ¶ms, &mut res) { Ok(_) => {} Err(e) => return Err(anyhow::Error::msg(format!("init function failed: {}", e))), } @@ -135,15 +181,23 @@ impl H2O { } pub fn _process_config(&mut self, config: &WATERConfig) -> Result<(), anyhow::Error> { - info!("[HOST] WATERCore H2O calling _process_config from WASM..."); + info!("[HOST] WATERCore calling _process_config from WASM..."); + + let store_lock_result = self.store.lock(); + + let mut store = match store_lock_result { + Ok(store) => store, + Err(e) => return Err(anyhow::Error::msg(format!("Failed to lock store: {}", e))), + }; // _required to implement _process_config(i32) in WASM, which will be parsing all the configurations - let config_fn = match self.instance.get_func(&mut self.store, CONFIG_FN) { + let config_fn = match self.instance.get_func(&mut *store, CONFIG_FN) { Some(func) => func, None => { - return Err(anyhow::Error::msg( - "_process_config function not found in WASM", - )) + // Currently not going to return error, where V0 don't need config; + // NOTE: remove this function for v1_preview as well, where config will be pulled from WASM + info!("config function not found -- skipping"); + return Ok(()); } }; @@ -164,16 +218,17 @@ impl H2O { let wasi_file = wasmtime_wasi::sync::file::File::from_cap_std(wasi_file); - let ctx = self - .store + let ctx = store .data_mut() .preview1_ctx .as_mut() .ok_or(anyhow::anyhow!("preview1_ctx in Store is None"))?; + + // push the config file into WATM let config_fd = ctx.push_file(Box::new(wasi_file), FileAccessMode::all())? as i32; - let params = vec![Val::I32(config_fd); config_fn.ty(&self.store).params().len()]; - match config_fn.call(&mut self.store, ¶ms, &mut []) { + let params = vec![Val::I32(config_fd); config_fn.ty(&*store).params().len()]; + match config_fn.call(&mut *store, ¶ms, &mut []) { Ok(_) => {} Err(e) => { return Err(anyhow::Error::msg(format!( diff --git a/crates/water/src/runtime/instance.rs b/crates/water/src/runtime/instance.rs deleted file mode 100644 index e69de29..0000000 diff --git a/crates/water/src/runtime/listener.rs b/crates/water/src/runtime/listener.rs index e4b10be..b0ebd3a 100644 --- a/crates/water/src/runtime/listener.rs +++ b/crates/water/src/runtime/listener.rs @@ -1,223 +1,9 @@ -use crate::runtime::*; +use crate::runtime::{transport::WATERTransportTrait, *}; -pub struct WATERListener { - // WASM functions for reading & writing +pub trait WATERListenerTrait: WATERTransportTrait { + fn accept(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error>; - // the reader in WASM (read from net -- n2w) - // returns the number of bytes read - pub reader: Func, - - // the writer in WASM (write to net -- w2n) - // returns the number of bytes written - pub writer: Func, - - pub caller_reader: UnixStream, // the reader in Caller (read from WASM -- w2u) - pub caller_writer: UnixStream, // the writer in Caller (write to WASM -- u2w) - - pub core: H2O, // core WASM runtime (engine, linker, instance, store, module) -} - -impl WATERListener { - /// Read from the target address - pub fn read(&mut self, buf: &mut Vec) -> Result { - info!("[HOST] WATERStream reading..."); - - let mut res = vec![Val::I64(0); self.reader.ty(&self.core.store).results().len()]; - match self.reader.call(&mut self.core.store, &[], &mut res) { - Ok(_) => {} - Err(e) => { - return Err(anyhow::Error::msg(format!( - "{} function failed: {}", - READER_FN, e - ))) - } - } - - let nums: i64 = match res.get(0) { - Some(wasmtime::Val::I64(v)) => *v, - _ => { - return Err(anyhow::Error::msg(format!( - "{} function returned unexpected type / no return", - READER_FN - ))) - } - }; - - // read from WASM's caller_reader - buf.resize(nums as usize, 0); - match self.caller_reader.read(&mut buf[..]) { - Ok(_) => {} - Err(e) => { - return Err(anyhow::Error::msg(format!( - "failed to read from caller_reader: {}", - e - ))) - } - } - - Ok(nums) - } - - /// Write to the target address - pub fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { - info!("[HOST] WATERStream writing..."); - - // write to WASM's caller_writer - match self.caller_writer.write_all(buf) { - Ok(_) => {} - Err(e) => { - return Err(anyhow::Error::msg(format!( - "failed to write to caller_writer: {}", - e - ))) - } - } - - let params = vec![Val::I64(buf.len() as i64)]; - let mut res = vec![Val::I64(0)]; - match self.writer.call(&mut self.core.store, ¶ms, &mut res) { - Ok(_) => { - match res.get(0) { - Some(wasmtime::Val::I64(v)) => { - if *v != buf.len() as i64 { - return Err(anyhow::Error::msg(format!( - "WASM write function returned unexpected value: {}", - *v - ))); - } - } - _ => { - return Err(anyhow::Error::msg( - "user_write_done function returned unexpected type / no return", - )) - } - }; - } - Err(e) => { - return Err(anyhow::Error::msg(format!( - "{} function failed: {}", - WRITER_FN, e - ))) - } - } - - Ok(()) - } - - /// Listening at the addr:port with running the WASM listen function - pub fn listen( - &mut self, - conf: &WATERConfig, - _addr: &str, - _port: u16, - ) -> Result<(), anyhow::Error> { - info!("[HOST] WATERStream listening..."); - - // TODO: add addr:port sharing with WASM, for now WASM is using config.json's remote_addr:port - let fnc = self - .core - .instance - .get_func(&mut self.core.store, &conf.entry_fn) - .context(format!("Failed to get function {}", &conf.entry_fn))?; - - match fnc.call(&mut self.core.store, &[], &mut []) { - Ok(_) => {} - Err(e) => { - return Err(anyhow::Error::msg(format!( - "connect function failed: {}", - e - ))) - } - } - - Ok(()) - } - - pub fn init(conf: &WATERConfig) -> Result { - info!("[HOST] WATERStream init..."); - - let mut core = H2O::init(conf)?; - core._prepare(conf)?; - - // constructing 2 pairs of UnixStream for communicating between WASM and Host - // returns (read_end, write_end) for caller - let (caller_reader, water_writer) = UnixStream::pair()?; - let (water_reader, caller_writer) = UnixStream::pair()?; - - let water_write_file = unsafe { cap_std::fs::File::from_raw_fd(water_writer.as_raw_fd()) }; - let water_read_file = unsafe { cap_std::fs::File::from_raw_fd(water_reader.as_raw_fd()) }; - - // insert file here - let wasi_water_reader = wasmtime_wasi::sync::file::File::from_cap_std(water_read_file); - let wasi_water_writer = wasmtime_wasi::sync::file::File::from_cap_std(water_write_file); - - std::mem::forget(water_writer); - std::mem::forget(water_reader); - - let ctx = core - .store - .data_mut() - .preview1_ctx - .as_mut() - .ok_or(anyhow::anyhow!("preview1_ctx in Store is None"))?; - let water_reader_fd = ctx.push_file(Box::new(wasi_water_reader), FileAccessMode::all())?; - let water_writer_fd = ctx.push_file(Box::new(wasi_water_writer), FileAccessMode::all())?; - - let water_bridging = match core.instance.get_func(&mut core.store, WATER_BRIDGING_FN) { - Some(func) => func, - None => { - return Err(anyhow::Error::msg(format!( - "{} function not found in WASM", - WATER_BRIDGING_FN - ))) - } - }; - - let params = vec![ - Val::I32(water_reader_fd as i32), - Val::I32(water_writer_fd as i32), - ]; - match water_bridging.call(&mut core.store, ¶ms, &mut []) { - Ok(_) => {} - Err(e) => { - return Err(anyhow::Error::msg(format!( - "{} function failed: {}", - WATER_BRIDGING_FN, e - ))) - } - } - - // getting reader & writer func from WASM - let reader = match core.instance.get_func(&mut core.store, READER_FN) { - Some(func) => func, - None => { - return Err(anyhow::Error::msg(format!( - "{} function not found in WASM", - READER_FN - ))) - } - }; - - let writer = match core.instance.get_func(&mut core.store, WRITER_FN) { - Some(func) => func, - None => { - return Err(anyhow::Error::msg(format!( - "{} function not found in WASM", - WRITER_FN - ))) - } - }; - - let runtime = WATERListener { - reader, - writer, - - caller_reader, - caller_writer, - - core, - }; - - Ok(runtime) + fn listen(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + Err(anyhow::anyhow!("Method not supported")) } } diff --git a/crates/water/src/runtime/mod.rs b/crates/water/src/runtime/mod.rs index b190da1..cd7f347 100644 --- a/crates/water/src/runtime/mod.rs +++ b/crates/water/src/runtime/mod.rs @@ -1,9 +1,12 @@ // =================== MODULES =================== +pub mod client; pub mod core; pub mod listener; pub mod net; +pub mod relay; pub mod runner; pub mod stream; +pub mod transport; pub mod v0; pub mod v1; pub mod version; @@ -34,129 +37,14 @@ use wasmtime_wasi_threads::WasiThreadsCtx; // =================== CURRENT CRATE IMPORTS =================== use crate::{ config::{WATERConfig, WaterBinType}, - globals::{CONFIG_FN, DIAL_FN, INIT_FN, READER_FN, WATER_BRIDGING_FN, WRITER_FN}, + globals::{ + ACCEPT_FN, ASSOCIATE_FN, CANCEL_FN, CONFIG_FN, DIAL_FN, INIT_FN, READER_FN, + WATER_BRIDGING_FN, WRITER_FN, + }, }; // =================== MODULES' DEPENDENCIES =================== use self::core::{Host, H2O}; -use self::listener::WATERListener; use self::net::{ConnectFile, File, ListenFile}; use self::runner::WATERRunner; -use self::stream::WATERStream; use self::version::Version; - -// =================== WATERClient Definition =================== -pub enum WATERClientType { - Dialer(WATERStream), - Listener(WATERListener), - Runner(WATERRunner), // This is a customized runner -- not like any stream -} - -pub struct WATERClient { - debug: bool, - - pub config: WATERConfig, - pub stream: WATERClientType, -} - -impl WATERClient { - pub fn new(conf: WATERConfig) -> Result { - // client_type: 0 -> Dialer, 1 -> Listener, 2 -> Runner - let water = match conf.client_type { - WaterBinType::Dial => { - let stream = WATERStream::init(&conf)?; - WATERClientType::Dialer(stream) - } - WaterBinType::Listen => { - let stream = WATERListener::init(&conf)?; - WATERClientType::Listener(stream) - } - WaterBinType::Runner => { - let runner = WATERRunner::init(&conf)?; - WATERClientType::Runner(runner) - } - _ => { - return Err(anyhow::anyhow!("Invalid client type")); - } - }; - - Ok(WATERClient { - config: conf, - debug: false, - stream: water, - }) - } - - pub fn set_debug(&mut self, debug: bool) { - self.debug = debug; - } - - pub fn execute(&mut self) -> Result<(), anyhow::Error> { - info!("[HOST] WATERClient Executing ..."); - - match &mut self.stream { - WATERClientType::Runner(runner) => { - runner.run(&self.config)?; - } - _ => { - return Err(anyhow::anyhow!("This client is not a Runner")); - } - } - Ok(()) - } - - pub fn connect(&mut self, addr: &str, port: u16) -> Result<(), anyhow::Error> { - info!("[HOST] WATERClient connecting ..."); - - match &mut self.stream { - WATERClientType::Dialer(dialer) => { - dialer.connect(&self.config, addr, port)?; - } - _ => { - return Err(anyhow::anyhow!("This client is not a listener")); - } - } - Ok(()) - } - - pub fn listen(&mut self, addr: &str, port: u16) -> Result<(), anyhow::Error> { - info!("[HOST] WATERClient listening ..."); - - match &mut self.stream { - WATERClientType::Listener(listener) => { - listener.listen(&self.config, addr, port)?; - } - _ => { - return Err(anyhow::anyhow!("This client is not a listener")); - } - } - Ok(()) - } - - pub fn read(&mut self, buf: &mut Vec) -> Result { - let read_bytes = match self.stream { - WATERClientType::Dialer(ref mut dialer) => dialer.read(buf)?, - WATERClientType::Listener(ref mut listener) => listener.read(buf)?, - _ => { - return Err(anyhow::anyhow!("This client is not supporting read")); - } - }; - - Ok(read_bytes) - } - - pub fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { - match self.stream { - WATERClientType::Dialer(ref mut dialer) => { - dialer.write(buf)?; - } - WATERClientType::Listener(ref mut listener) => { - listener.write(buf)?; - } - _ => { - return Err(anyhow::anyhow!("This client is not supporting write")); - } - } - Ok(()) - } -} diff --git a/crates/water/src/runtime/relay.rs b/crates/water/src/runtime/relay.rs new file mode 100644 index 0000000..6ba90dc --- /dev/null +++ b/crates/water/src/runtime/relay.rs @@ -0,0 +1,7 @@ +use crate::runtime::{transport::WATERTransportTrait, *}; + +pub trait WATERRelayTrait: WATERTransportTrait { + fn associate(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error>; + + fn relay(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error>; +} diff --git a/crates/water/src/runtime/runner.rs b/crates/water/src/runtime/runner.rs index 7faab96..1e658a8 100644 --- a/crates/water/src/runtime/runner.rs +++ b/crates/water/src/runtime/runner.rs @@ -9,15 +9,22 @@ impl WATERRunner { pub fn run(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> { info!("[HOST] WATERRunner running..."); + let store_lock_result = self.core.store.lock(); + + let mut store = match store_lock_result { + Ok(store) => store, + Err(e) => return Err(anyhow::Error::msg(format!("Failed to lock store: {}", e))), + }; + let fnc = self .core .instance - .get_func(&mut self.core.store, &conf.entry_fn) + .get_func(&mut *store, &conf.entry_fn) .context(format!( "failed to find declared entry function: {}", &conf.entry_fn ))?; - match fnc.call(&mut self.core.store, &[], &mut []) { + match fnc.call(&mut *store, &[], &mut []) { Ok(_) => {} Err(e) => return Err(anyhow::Error::msg(format!("run function failed: {}", e))), } @@ -25,12 +32,9 @@ impl WATERRunner { Ok(()) } - pub fn init(conf: &WATERConfig) -> Result { + pub fn init(_conf: &WATERConfig, core: H2O) -> Result { info!("[HOST] WATERRunner init..."); - let mut core = H2O::init(conf)?; - core._prepare(conf)?; - let runtime = WATERRunner { core }; Ok(runtime) diff --git a/crates/water/src/runtime/stream.rs b/crates/water/src/runtime/stream.rs index cfb1f71..b348c82 100644 --- a/crates/water/src/runtime/stream.rs +++ b/crates/water/src/runtime/stream.rs @@ -1,228 +1,5 @@ -use crate::runtime::*; +use crate::runtime::{transport::WATERTransportTrait, *}; -/// This file contains the WATERStream implementation -/// which is a TcpStream liked definition with utilizing WASM - -// UnixSocket Connection created with Host -// Write => u2w +----------------+ w2n -// ----->| WATERStream |------> -// Caller | WASM Runtime | n2w Destination -// <-----| Decode/Encode |<------ -// Read => w2u +----------------+ -// WATERStream - -pub struct WATERStream { - // WASM functions for reading & writing - - // the reader in WASM (read from net -- n2w) - // returns the number of bytes read - pub reader: Func, - - // the writer in WASM (write to net -- w2n) - // returns the number of bytes written - pub writer: Func, - - pub caller_io: UnixStream, // the pipe for communcating between Host and WASM - - pub core: H2O, // core WASM runtime (engine, linker, instance, store, module) -} - -impl WATERStream { - /// Read from the target address - pub fn read(&mut self, buf: &mut Vec) -> Result { - debug!("[HOST] WATERStream reading..."); - - let mut res = vec![Val::I64(0); self.reader.ty(&self.core.store).results().len()]; - match self.reader.call(&mut self.core.store, &[], &mut res) { - Ok(_) => {} - Err(e) => { - return Err(anyhow::Error::msg(format!( - "{} function failed: {}", - READER_FN, e - ))) - } - } - - let nums: i64 = match res.get(0) { - Some(wasmtime::Val::I64(v)) => *v, - _ => { - return Err(anyhow::Error::msg(format!( - "{} function returned unexpected type / no return", - READER_FN - ))) - } - }; - - // read from WASM's caller_reader - buf.resize(nums as usize, 0); - match self.caller_io.read(&mut buf[..]) { - Ok(_) => {} - Err(e) => { - return Err(anyhow::Error::msg(format!( - "failed to read from caller_reader: {}", - e - ))) - } - } - - Ok(nums) - } - - /// Write to the target address - pub fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { - debug!("[HOST] WATERStream writing..."); - - // write to WASM's caller_writer - match self.caller_io.write_all(buf) { - Ok(_) => {} - Err(e) => { - return Err(anyhow::Error::msg(format!( - "failed to write to caller_writer: {}", - e - ))) - } - } - - let params = vec![Val::I64(buf.len() as i64)]; - let mut res = vec![Val::I64(0)]; - match self.writer.call(&mut self.core.store, ¶ms, &mut res) { - Ok(_) => { - match res.get(0) { - Some(wasmtime::Val::I64(v)) => { - if *v != buf.len() as i64 { - return Err(anyhow::Error::msg(format!( - "WASM write function returned unexpected value: {}", - *v - ))); - } - } - _ => { - return Err(anyhow::Error::msg( - "user_write_done function returned unexpected type / no return", - )) - } - }; - } - Err(e) => { - return Err(anyhow::Error::msg(format!( - "{} function failed: {}", - WRITER_FN, e - ))) - } - } - - Ok(()) - } - - /// Connect to the target address with running the WASM connect function - pub fn connect( - &mut self, - conf: &WATERConfig, - _addr: &str, - _port: u16, - ) -> Result<(), anyhow::Error> { - info!("[HOST] WATERStream connecting..."); - - // TODO: add addr:port sharing with WASM, for now WASM is using config.json's remote_addr:port - let fnc = match self.core.instance.get_func(&mut self.core.store, DIAL_FN) { - Some(func) => func, - None => { - return Err(anyhow::Error::msg(format!( - "{} function not found in WASM", - conf.entry_fn - ))) - } - }; - - match fnc.call(&mut self.core.store, &[], &mut []) { - Ok(_) => {} - Err(e) => { - return Err(anyhow::Error::msg(format!( - "connect function failed: {}", - e - ))) - } - } - - Ok(()) - } - - pub fn init(conf: &WATERConfig) -> Result { - info!("[HOST] WATERStream init..."); - - let mut core = H2O::init(conf)?; - core._prepare(conf)?; - - // constructing a pair of UnixStream for communicating between WASM and Host - let (caller_io, water_io) = UnixStream::pair()?; - - let water_io_file = unsafe { cap_std::fs::File::from_raw_fd(water_io.as_raw_fd()) }; - - // insert file here - let water_io_file = wasmtime_wasi::sync::file::File::from_cap_std(water_io_file); - - std::mem::forget(water_io); // forget the water_io, so that it won't be closed - - let ctx = core - .store - .data_mut() - .preview1_ctx - .as_mut() - .context("Failed to retrieve preview1_ctx from Host")?; - let water_io_fd = ctx.push_file(Box::new(water_io_file), FileAccessMode::all())?; - - let water_bridging = match core.instance.get_func(&mut core.store, WATER_BRIDGING_FN) { - Some(func) => func, - None => { - return Err(anyhow::Error::msg(format!( - "{} function not found in WASM", - WATER_BRIDGING_FN - ))) - } - }; - - // let params = vec![Val::I32(water_reader_fd as i32), Val::I32(water_writer_fd as i32)]; - let params = vec![Val::I32(water_io_fd as i32)]; - match water_bridging.call(&mut core.store, ¶ms, &mut []) { - Ok(_) => {} - Err(e) => { - return Err(anyhow::Error::msg(format!( - "{} function failed: {}", - WATER_BRIDGING_FN, e - ))) - } - } - - // getting reader & writer func from WASM - let reader = match core.instance.get_func(&mut core.store, READER_FN) { - Some(func) => func, - None => { - return Err(anyhow::Error::msg(format!( - "{} function not found in WASM", - READER_FN - ))) - } - }; - - let writer = match core.instance.get_func(&mut core.store, WRITER_FN) { - Some(func) => func, - None => { - return Err(anyhow::Error::msg(format!( - "{} function not found in WASM", - WRITER_FN - ))) - } - }; - - let runtime = WATERStream { - reader, - writer, - - caller_io, - - core, - }; - - Ok(runtime) - } +pub trait WATERStreamTrait: WATERTransportTrait { + fn connect(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error>; } diff --git a/crates/water/src/runtime/transport.rs b/crates/water/src/runtime/transport.rs new file mode 100644 index 0000000..86e079d --- /dev/null +++ b/crates/water/src/runtime/transport.rs @@ -0,0 +1,219 @@ +use std::thread::JoinHandle; + +use crate::runtime::*; + +pub trait WATERTransportTrait: Send { + // ============================ all version ============================ + fn read(&mut self, buf: &mut Vec) -> Result { + info!("[HOST] WATERTransport v0 reading..."); + + let caller_io = self.get_caller_io(); + + // read from WASM's caller_reader + match caller_io { + Some(ref mut caller_io) => match caller_io.read(buf) { + Ok(n) => Ok(n as i64), + Err(e) => Err(anyhow::Error::msg(format!( + "failed to read from caller_reader: {}", + e + ))), + }, + None => Err(anyhow::Error::msg(format!( + "read function failed: {}", + "caller_io is None" + ))), + } + } + + fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { + info!("[HOST] WATERTransport v0 writing..."); + + let caller_io = self.get_caller_io(); + + // write to WASM's caller_writer + match caller_io { + Some(ref mut caller_io) => match caller_io.write_all(buf) { + Ok(_) => Ok(()), + Err(e) => Err(anyhow::Error::msg(format!( + "failed to write to caller_writer: {}", + e + ))), + }, + None => Err(anyhow::Error::msg(format!( + "write function failed: {}", + "caller_io is None" + ))), + } + } + + // ============================ v0 only ============================ + // Methods to provide access to the shared state, not implemented by default + fn get_caller_io(&mut self) -> &mut Option { + unimplemented!("get_caller_io not implemented") + } + fn get_cancel_io(&mut self) -> &mut Option { + unimplemented!("get_cancel_io not implemented") + } + fn get_core(&mut self) -> &mut H2O { + unimplemented!("get_core not implemented") + } + + fn set_caller_io(&mut self, _caller_io: Option) { + unimplemented!("set_caller_io not implemented") + } + fn set_cancel_io(&mut self, _cancel_io: Option) { + unimplemented!("set_cancel_io not implemented") + } + + fn cancel_with(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + info!("[HOST] WATERTransport v0 cancel_with..."); + + let (caller_io, water_io) = UnixStream::pair()?; + + self.set_cancel_io(Some(caller_io)); + + let water_io_file = unsafe { cap_std::fs::File::from_raw_fd(water_io.as_raw_fd()) }; + + // insert file here + let water_io_file = wasmtime_wasi::sync::file::File::from_cap_std(water_io_file); + + std::mem::forget(water_io); // forget the water_io, so that it won't be closed + + let core = self.get_core(); + + let mut store = core + .store + .lock() + .map_err(|e| anyhow::Error::msg(format!("Failed to lock store: {}", e)))?; + + let ctx = store + .data_mut() + .preview1_ctx + .as_mut() + .context("Failed to retrieve preview1_ctx from Host")?; + + let water_io_fd = ctx.push_file(Box::new(water_io_file), FileAccessMode::all())?; + + let _water_cancel_with = match core.instance.get_func(&mut *store, CANCEL_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + CANCEL_FN + ))) + } + }; + + let params: Vec = vec![Val::I32(water_io_fd as i32)]; + let mut res = vec![Val::I32(0); _water_cancel_with.ty(&*store).results().len()]; + match _water_cancel_with.call(&mut *store, ¶ms, &mut res) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + CANCEL_FN, e + ))) + } + } + + if res[0].unwrap_i32() != 0 { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + CANCEL_FN, "connection failed" + ))); + } + + Ok(()) + } + + fn cancel(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + info!("[HOST] WATERTransport v0 cancel..."); + + let cancel_io = self.get_cancel_io(); + + match cancel_io { + Some(ref mut cancel_io) => { + // write anything to cancel + match cancel_io.write_all(&[0]) { + Ok(_) => Ok(()), + Err(e) => Err(anyhow::Error::msg(format!( + "failed to write to cancel_io: {}", + e + ))), + } + } + None => Err(anyhow::Error::msg(format!( + "cancel function failed: {}", + "cancel_io is None" + ))), + } + } + + fn run_entry_fn( + &mut self, + conf: &WATERConfig, + ) -> Result>, anyhow::Error> { + info!( + "[HOST] WATERTransport v0 running entry_fn {}...", + conf.entry_fn + ); + + let core = self.get_core(); + + let store = Arc::clone(&core.store); + let entry_fn = { + let mut store = store + .lock() + .map_err(|e| anyhow::Error::msg(format!("Failed to lock store: {}", e)))?; + match core.instance.get_func(&mut *store, conf.entry_fn.as_str()) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + conf.entry_fn + ))) + } + } + }; + + // run the entry_fn in a thread -- Host will still have the ability to control it (e.g. with cancel) + let handle = std::thread::spawn(move || { + let mut store = store + .lock() + .map_err(|e| anyhow::Error::msg(format!("Failed to lock store: {}", e)))?; + let mut res = vec![Val::I32(0); entry_fn.ty(&mut *store).results().len()]; + match entry_fn.call(&mut *store, &[], &mut res) { + Ok(_) => Ok(()), + Err(e) => Err(anyhow::Error::msg(format!("function failed: {}", e))), + } + }); + + Ok(handle) + } + + // fn read(&mut self, _buf: &mut Vec) -> Result { + // Err(anyhow::anyhow!("Method not supported")) + // } + + // fn write(&mut self, _buf: &[u8]) -> Result<(), anyhow::Error> { + // Err(anyhow::anyhow!("Method not supported")) + // } + + // // v0 only + // fn cancel_with(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + // Err(anyhow::anyhow!("Method not supported")) + // } + + // // v0 only + // fn cancel(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + // Err(anyhow::anyhow!("Method not supported")) + // } + + // // v0 only + // fn run_entry_fn( + // &mut self, + // _conf: &WATERConfig, + // ) -> Result>, anyhow::Error> { + // Err(anyhow::anyhow!("Method not supported")) + // } +} diff --git a/crates/water/src/runtime/v0/config.rs b/crates/water/src/runtime/v0/config.rs new file mode 100644 index 0000000..45b9617 --- /dev/null +++ b/crates/water/src/runtime/v0/config.rs @@ -0,0 +1,169 @@ +use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd}; + +use anyhow::Context; +use serde::Deserialize; +use tracing::info; + +// A Config currently contains the local + remote ip & port +#[derive(Debug, Deserialize, Clone)] +pub struct Config { + pub local_address: String, + pub local_port: u32, + pub remote_address: String, + pub remote_port: u32, +} + +impl Default for Config { + fn default() -> Self { + Self::new() + } +} + +// implement a constructor for the config +impl Config { + pub fn new() -> Self { + Config { + local_address: String::from("127.0.0.1"), + local_port: 8080, + remote_address: String::from("example.com"), + remote_port: 8082, + } + } + + pub fn from(config_file: &str) -> Result { + let config_file = + std::fs::read_to_string(config_file).context("failed to read config file")?; + // let config: Config = json::from_str(&config_file).context("failed to parse config file")?; + + let config: Config = match serde_json::from_str(&config_file) { + Ok(config) => config, + Err(e) => { + eprintln!("[WASM] > _process_config ERROR: {}", e); + return Err(anyhow::Error::msg("failed to parse config file")); + } + }; + + Ok(config) + } +} + +#[derive(Debug, Clone)] +pub enum V0CRole { + Unknown, + Dialer(i32), + Listener(i32), + Relay(i32, i32), // listener_fd, dialer_fd +} + +// V0 specific configurations +// The addr:port pair will either be local / remote depend on the client_type +#[derive(Debug, Clone)] +pub struct V0Config { + pub name: String, + pub loc_addr: String, + pub loc_port: u32, + + pub remote_addr: String, + pub remote_port: u32, + + pub conn: V0CRole, +} + +impl V0Config { + pub fn init( + name: String, + loc_addr: String, + loc_port: u32, + remote_addr: String, + remote_port: u32, + ) -> Result { + Ok(V0Config { + name, + loc_addr, + loc_port, + remote_addr, + remote_port, + conn: V0CRole::Unknown, + }) + } + + pub fn connect(&mut self) -> Result { + let addr = format!("{}:{}", self.remote_addr, self.remote_port); + + info!("[HOST] WATERCore V0 connecting to {}", addr); + + match &mut self.conn { + V0CRole::Relay(_lis, ref mut conn_fd) => { + // now relay has been built, need to dial + let conn = std::net::TcpStream::connect(addr)?; + *conn_fd = conn.as_raw_fd(); + Ok(conn) + } + V0CRole::Unknown => { + let conn = std::net::TcpStream::connect(addr)?; + self.conn = V0CRole::Dialer(conn.as_raw_fd()); + Ok(conn) + } + _ => Err(anyhow::Error::msg("not a dialer")), + } + } + + pub fn create_listener(&mut self, is_relay: bool) -> Result<(), anyhow::Error> { + let addr = format!("{}:{}", self.loc_addr, self.loc_port); + + info!("[HOST] WATERCore V0 creating listener on {}", addr); + + let listener = std::net::TcpListener::bind(addr)?; + + if is_relay { + self.conn = V0CRole::Relay(listener.into_raw_fd(), 0); + } else { + self.conn = V0CRole::Listener(listener.into_raw_fd()); + } + Ok(()) + } + + pub fn accept(&mut self) -> Result { + info!("[HOST] WATERCore V0 accept with conn {:?} ...", self.conn); + + match &self.conn { + V0CRole::Listener(listener) => { + let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; + let (stream, _) = listener.accept()?; + self.conn = V0CRole::Listener(listener.into_raw_fd()); // makde sure it is not closed after scope + Ok(stream) + } + V0CRole::Relay(listener, _) => { + let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; + let (stream, _) = listener.accept()?; + self.conn = V0CRole::Relay(listener.into_raw_fd(), 0); // makde sure it is not closed after scope + Ok(stream) + } + _ => Err(anyhow::Error::msg("not a listener")), + } + } + + pub fn defer(&mut self) { + info!("[HOST] WATERCore V0 defer with conn {:?} ...", self.conn); + + match &self.conn { + V0CRole::Listener(_listener) => { + // TODO: Listener shouldn't be deferred, but the stream it connected to should be + // let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; + // drop(listener); + } + V0CRole::Dialer(conn) => { + let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn) }; + drop(conn); + } + V0CRole::Relay(_listener, conn) => { + // Listener shouldn't be deferred, like the above reason + // let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; + // drop(listener); + let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn) }; + drop(conn); + } + _ => {} + } + } +} diff --git a/crates/water/src/runtime/v0/funcs.rs b/crates/water/src/runtime/v0/funcs.rs index b54480a..c25af7b 100644 --- a/crates/water/src/runtime/v0/funcs.rs +++ b/crates/water/src/runtime/v0/funcs.rs @@ -1,60 +1,25 @@ -use anyhow::Ok; - -use crate::config::wasm_shared_config::StreamConfig; +use crate::runtime::v0::config::V0Config; use crate::runtime::*; -use std::convert::TryInto; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::sync::{Arc, Mutex}; -// TODO: rename this to dial_v1, since it has the ability to let WASM choose ip:port -pub fn export_tcp_connect(linker: &mut Linker) -> Result<(), anyhow::Error> { +pub fn export_tcp_connect( + linker: &mut Linker, + config: Arc>, +) -> Result<(), anyhow::Error> { linker .func_wrap( "env", - "connect_tcp", - move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32 { - info!("[WASM] invoking Host exported Dial func connect_tcp..."); - - let memory = match caller.get_export("memory") { - Some(Extern::Memory(memory)) => memory, - _ => return -1, - }; - - // Get a slice of the memory. - let mem_slice = memory.data_mut(&mut caller); - - // Use the offset and size to get the relevant part of the memory. - let data = &mut mem_slice[ptr as usize..(ptr as usize + size as usize)]; + "host_dial", + move |mut caller: Caller<'_, Host>| -> i32 { + info!("[WASM] invoking host_dial v0 ..."); - let config: StreamConfig = - bincode::deserialize(data).expect("Failed to deserialize"); + let mut config = config.lock().unwrap(); - let connect_file = File::Connect(ConnectFile::Tcp { - name: Some(config.name.clone().try_into().unwrap()), - port: config.port as u16, - host: config.addr.clone(), - }); - - // Get the pair here addr:port - let (host, port) = match connect_file { - File::Connect(listen_file) => match listen_file { - ConnectFile::Tcp { host, port, .. } - | ConnectFile::Tls { host, port, .. } => (host, port), - }, - _ => ("Wrong".into(), 0), - }; - - let tcp = match (host.as_str(), port) { - ("localhost", port) => std::net::TcpStream::connect(SocketAddr::V4( - SocketAddrV4::new(Ipv4Addr::LOCALHOST, port), - )), - addr => std::net::TcpStream::connect(addr), - } - .map(TcpStream::from_std) - .context(format!( - "Failed to connect to {}:{} in Host exported dial", - host, port - )) - .unwrap(); + let tcp = config + .connect() + .map(TcpStream::from_std) + .context("failed to connect to endpoint") + .unwrap(); // Connecting Tcp let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); @@ -75,49 +40,26 @@ pub fn export_tcp_connect(linker: &mut Linker) -> Result<(), anyhow::Error Ok(()) } -// TODO: rename this to dial_v1, since it has the ability to let WASM listen on a TcpListener -pub fn export_tcplistener_create(linker: &mut Linker) -> Result<(), anyhow::Error> { +pub fn export_accept( + linker: &mut Linker, + config: Arc>, +) -> Result<(), anyhow::Error> { linker .func_wrap( "env", - "create_listen", - move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32 { - info!("[WASM] invoking Host exported Dial func create_tcp_listener..."); - - let memory = match caller.get_export("memory") { - Some(Extern::Memory(memory)) => memory, - _ => return -1, - }; + "host_accept", + move |mut caller: Caller<'_, Host>| -> i32 { + info!("[WASM] invoking host_accept v0 ..."); - // Get a slice of the memory. - let mem_slice = memory.data_mut(&mut caller); + let mut config = config.lock().unwrap(); - // Use the offset and size to get the relevant part of the memory. - let data = &mut mem_slice[ptr as usize..(ptr as usize + size as usize)]; - - let config: StreamConfig = - bincode::deserialize(data).expect("Failed to deserialize"); - - let listener_file = File::Listen(ListenFile::Tcp { - name: config.name.clone().try_into().unwrap(), - port: config.port as u16, - addr: config.addr.clone(), - }); - - // Get the pair here addr:port - let (addr, port) = match listener_file { - File::Listen(listen_file) => match listen_file { - ListenFile::Tcp { addr, port, .. } | ListenFile::Tls { addr, port, .. } => { - (addr, port) - } - }, - _ => ("Wrong".into(), 0), - }; + let tcp = config + .accept() + .map(TcpStream::from_std) + .context("failed to accept") + .unwrap(); - // Creating Tcp Listener - let tcp = std::net::TcpListener::bind((addr.as_str(), port)).unwrap(); - let tcp = TcpListener::from_std(tcp); - // tcp.set_nonblocking(true); + // Connecting Tcp let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); // Get the WasiCtx of the caller(WASM), then insert_file into it @@ -136,19 +78,19 @@ pub fn export_tcplistener_create(linker: &mut Linker) -> Result<(), anyhow Ok(()) } -// Generically link dial functions -// pub fn linkDialFns(linker: &mut Linker) { -// let network = vec!["tcplistener", "tlslistener", "udp"]; +// TODO: implement this +pub fn export_defer( + linker: &mut Linker, + config: Arc>, +) -> Result<(), anyhow::Error> { + linker + .func_wrap("env", "host_defer", move |_caller: Caller<'_, Host>| { + info!("[WASM] invoking host_defer v0 ..."); -// for net in &network { -// match linker.func_wrap("env", &format!("connect_{}", net), move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32{ -// // TODO: get addr from WASM + let mut config = config.lock().unwrap(); -// let socket_fd = dialer.Dial(net, addr).unwrap(); -// socket_fd -// }) { -// Ok(_) => {}, -// Err(e) => { eprintln!("Failed to define function: {}", e) }, -// }; -// } -// } + config.defer(); + }) + .context("Failed to export defer function to WASM")?; + Ok(()) +} diff --git a/crates/water/src/runtime/v0/listener.rs b/crates/water/src/runtime/v0/listener.rs new file mode 100644 index 0000000..685f507 --- /dev/null +++ b/crates/water/src/runtime/v0/listener.rs @@ -0,0 +1,132 @@ +use crate::runtime::{listener::WATERListenerTrait, transport::WATERTransportTrait, *}; + +pub struct WATERListener { + pub caller_io: Option, // the pipe for communcating between Host and WASM + pub cancel_io: Option, // the UnixStream side for communcating between Host and WASM + + pub core: H2O, // core WASM runtime (engine, linker, instance, store, module) +} + +// impl WATERTransportTrait for WATERListener {} + +impl WATERTransportTrait for WATERListener { + fn get_caller_io(&mut self) -> &mut Option { + &mut self.caller_io + } + + fn get_cancel_io(&mut self) -> &mut Option { + &mut self.cancel_io + } + + fn get_core(&mut self) -> &mut H2O { + &mut self.core + } + + fn set_caller_io(&mut self, caller_io: Option) { + self.caller_io = caller_io; + } + + fn set_cancel_io(&mut self, cancel_io: Option) { + self.cancel_io = cancel_io; + } +} + +impl WATERListenerTrait for WATERListener { + /// Connect to the target address with running the WASM connect function + fn listen(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + info!("[HOST] WATERListener v0 create listener..."); + + if let Version::V0(v0_conf) = &mut self.core.version { + match v0_conf { + Some(v0_conf) => match v0_conf.lock() { + Ok(mut v0_conf) => { + v0_conf.create_listener(false)?; + } + Err(e) => { + return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?; + } + }, + None => { + return Err(anyhow::anyhow!("v0_conf is None"))?; + } + } + } + + Ok(()) + } + + fn accept(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + info!("[HOST] WATERListener v0 accepting..."); + + let (caller_io, water_io) = UnixStream::pair()?; + self.caller_io = Some(caller_io); + + // push the WATM end of the Unixpipe to WATM + let water_io_file = unsafe { cap_std::fs::File::from_raw_fd(water_io.as_raw_fd()) }; + + // insert file here + let water_io_file = wasmtime_wasi::sync::file::File::from_cap_std(water_io_file); + + std::mem::forget(water_io); // forget the water_io, so that it won't be closed + + let mut store = self + .core + .store + .lock() + .map_err(|e| anyhow::Error::msg(format!("Failed to lock store: {}", e)))?; + + let ctx = store + .data_mut() + .preview1_ctx + .as_mut() + .context("Failed to retrieve preview1_ctx from Host")?; + + let water_io_fd = ctx.push_file(Box::new(water_io_file), FileAccessMode::all())?; + + let _water_accept = match self.core.instance.get_func(&mut *store, ACCEPT_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + ACCEPT_FN + ))) + } + }; + + // calling the WASM dial function + let params: Vec = vec![Val::I32(water_io_fd as i32)]; + let mut res = vec![Val::I32(0); _water_accept.ty(&*store).results().len()]; + match _water_accept.call(&mut *store, ¶ms, &mut res) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + ACCEPT_FN, e + ))) + } + } + + if res[0].unwrap_i32() < 0 { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + ACCEPT_FN, "connection failed" + ))); + } + + Ok(()) + } +} + +impl WATERListener { + pub fn init(_conf: &WATERConfig, core: H2O) -> Result { + info!("[HOST] WATERListener v0 init..."); + + let runtime = WATERListener { + caller_io: None, + cancel_io: None, + core, + }; + + Ok(runtime) + } +} diff --git a/crates/water/src/runtime/v0/mod.rs b/crates/water/src/runtime/v0/mod.rs index 5ef91ae..85e0d9e 100644 --- a/crates/water/src/runtime/v0/mod.rs +++ b/crates/water/src/runtime/v0/mod.rs @@ -1 +1,5 @@ +pub mod config; pub mod funcs; +pub mod listener; +pub mod relay; +pub mod stream; diff --git a/crates/water/src/runtime/v0/relay.rs b/crates/water/src/runtime/v0/relay.rs new file mode 100644 index 0000000..b97fea1 --- /dev/null +++ b/crates/water/src/runtime/v0/relay.rs @@ -0,0 +1,111 @@ +use crate::runtime::{relay::WATERRelayTrait, transport::WATERTransportTrait, *}; + +pub struct WATERRelay { + pub caller_io: Option, // the pipe for communcating between Host and WASM + pub cancel_io: Option, // the UnixStream side for communcating between Host and WASM + + pub core: H2O, // core WASM runtime (engine, linker, instance, store, module) +} + +impl WATERTransportTrait for WATERRelay { + fn get_caller_io(&mut self) -> &mut Option { + &mut self.caller_io + } + + fn get_cancel_io(&mut self) -> &mut Option { + &mut self.cancel_io + } + + fn get_core(&mut self) -> &mut H2O { + &mut self.core + } + + fn set_caller_io(&mut self, caller_io: Option) { + self.caller_io = caller_io; + } + + fn set_cancel_io(&mut self, cancel_io: Option) { + self.cancel_io = cancel_io; + } +} + +impl WATERRelayTrait for WATERRelay { + /// Connect to the target address with running the WASM connect function + fn associate(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + info!("[HOST] WATERRelay v0 associating..."); + + let mut store = self + .core + .store + .lock() + .map_err(|e| anyhow::Error::msg(format!("Failed to lock store: {}", e)))?; + + let _water_associate = match self.core.instance.get_func(&mut *store, ASSOCIATE_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + ASSOCIATE_FN + ))) + } + }; + + // calling the WATM associate function + let mut res = vec![Val::I32(0); _water_associate.ty(&*store).results().len()]; + match _water_associate.call(&mut *store, &[], &mut res) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + ASSOCIATE_FN, e + ))) + } + } + + if res[0].unwrap_i32() < 0 { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + ASSOCIATE_FN, "connection failed" + ))); + } + + Ok(()) + } + + fn relay(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + info!("[HOST] WATERRelay v0 relaying..."); + + // create listener + if let Version::V0(v0_conf) = &mut self.core.version { + match v0_conf { + Some(v0_conf) => match v0_conf.lock() { + Ok(mut v0_conf) => { + v0_conf.create_listener(true)?; + } + Err(e) => { + return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?; + } + }, + None => { + return Err(anyhow::anyhow!("v0_conf is None"))?; + } + } + } + + Ok(()) + } +} + +impl WATERRelay { + pub fn init(_conf: &WATERConfig, core: H2O) -> Result { + info!("[HOST] WATERRelay v0 init..."); + + let runtime = WATERRelay { + caller_io: None, + cancel_io: None, + core, + }; + + Ok(runtime) + } +} diff --git a/crates/water/src/runtime/v0/stream.rs b/crates/water/src/runtime/v0/stream.rs new file mode 100644 index 0000000..6cde53b --- /dev/null +++ b/crates/water/src/runtime/v0/stream.rs @@ -0,0 +1,122 @@ +use crate::runtime::{stream::WATERStreamTrait, transport::WATERTransportTrait, *}; +// use crate::runtime::{stream::WATERStreamTrait, *, v0::transport::WATERTransportTraitV0, transport::WATERTransportTrait}; + +/// This file contains the WATERStream implementation +/// which is a TcpStream liked definition with utilizing WASM + +// UnixSocket Connection created with Host +// Write => u2w +----------------+ w2n +// ----->| WATERStream |------> +// Caller | WASM Runtime | n2w Destination +// <-----| Decode/Encode |<------ +// Read => w2u +----------------+ +// WATERStream + +pub struct WATERStream { + pub caller_io: Option, // the pipe for communcating between Host and WASM + pub cancel_io: Option, // the UnixStream side for communcating between Host and WASM + + pub core: H2O, // core WASM runtime (engine, linker, instance, store, module) +} + +// impl WATERTransportTrait for WATERStream {} + +impl WATERTransportTrait for WATERStream { + fn get_caller_io(&mut self) -> &mut Option { + &mut self.caller_io + } + + fn get_cancel_io(&mut self) -> &mut Option { + &mut self.cancel_io + } + + fn get_core(&mut self) -> &mut H2O { + &mut self.core + } + + fn set_caller_io(&mut self, caller_io: Option) { + self.caller_io = caller_io; + } + + fn set_cancel_io(&mut self, cancel_io: Option) { + self.cancel_io = cancel_io; + } +} + +impl WATERStreamTrait for WATERStream { + /// Connect to the target address with running the WASM connect function + fn connect(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + info!("[HOST] WATERStream v0 connecting..."); + + let (caller_io, water_io) = UnixStream::pair()?; + self.caller_io = Some(caller_io); + + // push the WATM end of the Unixpipe to WATM + let water_io_file = unsafe { cap_std::fs::File::from_raw_fd(water_io.as_raw_fd()) }; + + // insert file here + let water_io_file = wasmtime_wasi::sync::file::File::from_cap_std(water_io_file); + + std::mem::forget(water_io); // forget the water_io, so that it won't be closed + + let mut store = self + .core + .store + .lock() + .map_err(|e| anyhow::Error::msg(format!("Failed to lock store: {}", e)))?; + + let ctx = store + .data_mut() + .preview1_ctx + .as_mut() + .context("Failed to retrieve preview1_ctx from Host")?; + + let water_io_fd = ctx.push_file(Box::new(water_io_file), FileAccessMode::all())?; + + let _water_dial = match self.core.instance.get_func(&mut *store, DIAL_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + DIAL_FN + ))) + } + }; + + // calling the WASM dial function + let params: Vec = vec![Val::I32(water_io_fd as i32)]; + let mut res = vec![Val::I32(0); _water_dial.ty(&*store).results().len()]; + match _water_dial.call(&mut *store, ¶ms, &mut res) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + DIAL_FN, e + ))) + } + } + + if res[0].unwrap_i32() < 0 { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + DIAL_FN, "connection failed" + ))); + } + + Ok(()) + } +} + +impl WATERStream { + pub fn init(_conf: &WATERConfig, core: H2O) -> Result { + info!("[HOST] WATERStream v0 init..."); + + let runtime = WATERStream { + caller_io: None, + cancel_io: None, + core, + }; + + Ok(runtime) + } +} diff --git a/crates/water/src/runtime/v1/funcs.rs b/crates/water/src/runtime/v1/funcs.rs index b54480a..be4a7e9 100644 --- a/crates/water/src/runtime/v1/funcs.rs +++ b/crates/water/src/runtime/v1/funcs.rs @@ -5,7 +5,6 @@ use crate::runtime::*; use std::convert::TryInto; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -// TODO: rename this to dial_v1, since it has the ability to let WASM choose ip:port pub fn export_tcp_connect(linker: &mut Linker) -> Result<(), anyhow::Error> { linker .func_wrap( @@ -75,7 +74,6 @@ pub fn export_tcp_connect(linker: &mut Linker) -> Result<(), anyhow::Error Ok(()) } -// TODO: rename this to dial_v1, since it has the ability to let WASM listen on a TcpListener pub fn export_tcplistener_create(linker: &mut Linker) -> Result<(), anyhow::Error> { linker .func_wrap( @@ -135,20 +133,3 @@ pub fn export_tcplistener_create(linker: &mut Linker) -> Result<(), anyhow .context("Failed to export TcpListener create function to WASM")?; Ok(()) } - -// Generically link dial functions -// pub fn linkDialFns(linker: &mut Linker) { -// let network = vec!["tcplistener", "tlslistener", "udp"]; - -// for net in &network { -// match linker.func_wrap("env", &format!("connect_{}", net), move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32{ -// // TODO: get addr from WASM - -// let socket_fd = dialer.Dial(net, addr).unwrap(); -// socket_fd -// }) { -// Ok(_) => {}, -// Err(e) => { eprintln!("Failed to define function: {}", e) }, -// }; -// } -// } diff --git a/crates/water/src/runtime/v1/listener.rs b/crates/water/src/runtime/v1/listener.rs new file mode 100644 index 0000000..b18fdcf --- /dev/null +++ b/crates/water/src/runtime/v1/listener.rs @@ -0,0 +1,252 @@ +use crate::runtime::{listener::WATERListenerTrait, transport::WATERTransportTrait, *}; + +pub struct WATERListener { + // WASM functions for reading & writing + + // the reader in WASM (read from net -- n2w) + // returns the number of bytes read + pub reader: Func, + + // the writer in WASM (write to net -- w2n) + // returns the number of bytes written + pub writer: Func, + + pub caller_reader: UnixStream, // the reader in Caller (read from WASM -- w2u) + pub caller_writer: UnixStream, // the writer in Caller (write to WASM -- u2w) + + pub core: H2O, // core WASM runtime (engine, linker, instance, store, module) +} + +impl WATERTransportTrait for WATERListener { + /// Read from the target address + fn read(&mut self, buf: &mut Vec) -> Result { + info!("[HOST] WATERListener v1_preview reading..."); + + let store_lock_result = self.core.store.lock(); + + let mut store = match store_lock_result { + Ok(store) => store, + Err(e) => return Err(anyhow::Error::msg(format!("Failed to lock store: {}", e))), + }; + + let mut res = vec![Val::I64(0); self.reader.ty(&*store).results().len()]; + match self.reader.call(&mut *store, &[], &mut res) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + READER_FN, e + ))) + } + } + + let nums: i64 = match res.get(0) { + Some(wasmtime::Val::I64(v)) => *v, + _ => { + return Err(anyhow::Error::msg(format!( + "{} function returned unexpected type / no return", + READER_FN + ))) + } + }; + + // read from WASM's caller_reader + buf.resize(nums as usize, 0); + match self.caller_reader.read(&mut buf[..]) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "failed to read from caller_reader: {}", + e + ))) + } + } + + Ok(nums) + } + + /// Write to the target address + fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { + info!("[HOST] WATERListener v1_preview writing..."); + + // write to WASM's caller_writer + match self.caller_writer.write_all(buf) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "failed to write to caller_writer: {}", + e + ))) + } + } + + let store_lock_result = self.core.store.lock(); + + let mut store = match store_lock_result { + Ok(store) => store, + Err(e) => return Err(anyhow::Error::msg(format!("Failed to lock store: {}", e))), + }; + + let params = vec![Val::I64(buf.len() as i64)]; + let mut res = vec![Val::I64(0)]; + match self.writer.call(&mut *store, ¶ms, &mut res) { + Ok(_) => { + match res.get(0) { + Some(wasmtime::Val::I64(v)) => { + if *v != buf.len() as i64 { + return Err(anyhow::Error::msg(format!( + "WASM write function returned unexpected value: {}", + *v + ))); + } + } + _ => { + return Err(anyhow::Error::msg( + "user_write_done function returned unexpected type / no return", + )) + } + }; + } + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + WRITER_FN, e + ))) + } + } + + Ok(()) + } +} + +// impl WATERTransportTraitV1 for WATERListener {} + +impl WATERListenerTrait for WATERListener { + /// Listening at the addr:port with running the WASM listen function + fn accept(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> { + info!("[HOST] WATERListener v1_preview listening..."); + + let store_lock_result = self.core.store.lock(); + + let mut store = match store_lock_result { + Ok(store) => store, + Err(e) => return Err(anyhow::Error::msg(format!("Failed to lock store: {}", e))), + }; + + // TODO: add addr:port sharing with WASM, for now WASM is using config.json's remote_addr:port + let fnc = self + .core + .instance + .get_func(&mut *store, &conf.entry_fn) + .context(format!("Failed to get function {}", &conf.entry_fn))?; + + match fnc.call(&mut *store, &[], &mut []) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "connect function failed: {}", + e + ))) + } + } + + Ok(()) + } +} + +impl WATERListener { + pub fn init(_conf: &WATERConfig, core: H2O) -> Result { + info!("[HOST] WATERListener v1_preview init..."); + + // constructing 2 pairs of UnixStream for communicating between WASM and Host + // returns (read_end, write_end) for caller + let (caller_reader, water_writer) = UnixStream::pair()?; + let (water_reader, caller_writer) = UnixStream::pair()?; + + let water_write_file = unsafe { cap_std::fs::File::from_raw_fd(water_writer.as_raw_fd()) }; + let water_read_file = unsafe { cap_std::fs::File::from_raw_fd(water_reader.as_raw_fd()) }; + + // insert file here + let wasi_water_reader = wasmtime_wasi::sync::file::File::from_cap_std(water_read_file); + let wasi_water_writer = wasmtime_wasi::sync::file::File::from_cap_std(water_write_file); + + std::mem::forget(water_writer); + std::mem::forget(water_reader); + + let reader; + let writer; + + { + let mut store = core + .store + .lock() + .map_err(|e| anyhow::Error::msg(format!("Failed to lock store: {}", e)))?; + let ctx = store + .data_mut() + .preview1_ctx + .as_mut() + .ok_or(anyhow::anyhow!("preview1_ctx in Store is None"))?; + let water_reader_fd = + ctx.push_file(Box::new(wasi_water_reader), FileAccessMode::all())?; + let water_writer_fd = + ctx.push_file(Box::new(wasi_water_writer), FileAccessMode::all())?; + + let water_bridging = match core.instance.get_func(&mut *store, WATER_BRIDGING_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + WATER_BRIDGING_FN + ))) + } + }; + + let params = vec![ + Val::I32(water_reader_fd as i32), + Val::I32(water_writer_fd as i32), + ]; + match water_bridging.call(&mut *store, ¶ms, &mut []) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + WATER_BRIDGING_FN, e + ))) + } + } + + // getting reader & writer func from WASM + reader = match core.instance.get_func(&mut *store, READER_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + READER_FN + ))) + } + }; + + writer = match core.instance.get_func(&mut *store, WRITER_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + WRITER_FN + ))) + } + }; + } + + let runtime = WATERListener { + reader, + writer, + + caller_reader, + caller_writer, + + core, + }; + + Ok(runtime) + } +} diff --git a/crates/water/src/runtime/v1/mod.rs b/crates/water/src/runtime/v1/mod.rs index 5ef91ae..4f82022 100644 --- a/crates/water/src/runtime/v1/mod.rs +++ b/crates/water/src/runtime/v1/mod.rs @@ -1 +1,3 @@ pub mod funcs; +pub mod listener; +pub mod stream; diff --git a/crates/water/src/runtime/v1/stream.rs b/crates/water/src/runtime/v1/stream.rs new file mode 100644 index 0000000..a33c3a4 --- /dev/null +++ b/crates/water/src/runtime/v1/stream.rs @@ -0,0 +1,252 @@ +use crate::runtime::{stream::WATERStreamTrait, transport::WATERTransportTrait, *}; + +/// This file contains the WATERStream implementation +/// which is a TcpStream liked definition with utilizing WASM + +// UnixSocket Connection created with Host +// Write => u2w +----------------+ w2n +// ----->| WATERStream |------> +// Caller | WASM Runtime | n2w Destination +// <-----| Decode/Encode |<------ +// Read => w2u +----------------+ +// WATERStream + +pub struct WATERStream { + // WASM functions for reading & writing + + // the reader in WASM (read from net -- n2w) + // returns the number of bytes read + pub reader: Func, + + // the writer in WASM (write to net -- w2n) + // returns the number of bytes written + pub writer: Func, + + pub caller_io: UnixStream, // the pipe for communcating between Host and WASM + + pub core: H2O, // core WASM runtime (engine, linker, instance, store, module) +} + +impl WATERTransportTrait for WATERStream { + /// Read from the target address + fn read(&mut self, buf: &mut Vec) -> Result { + debug!("[HOST] WATERStream v1_preview reading..."); + + let store_lock_result = self.core.store.lock(); + + let mut store = match store_lock_result { + Ok(store) => store, + Err(e) => return Err(anyhow::Error::msg(format!("Failed to lock store: {}", e))), + }; + + let mut res = vec![Val::I64(0); self.reader.ty(&*store).results().len()]; + match self.reader.call(&mut *store, &[], &mut res) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + READER_FN, e + ))) + } + } + + let nums: i64 = match res.get(0) { + Some(wasmtime::Val::I64(v)) => *v, + _ => { + return Err(anyhow::Error::msg(format!( + "{} function returned unexpected type / no return", + READER_FN + ))) + } + }; + + // read from WASM's caller_reader + buf.resize(nums as usize, 0); + match self.caller_io.read(&mut buf[..]) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "failed to read from caller_reader: {}", + e + ))) + } + } + + Ok(nums) + } + + /// Write to the target address + fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { + debug!("[HOST] WATERStream v1_preview writing..."); + + let store_lock_result = self.core.store.lock(); + + let mut store = match store_lock_result { + Ok(store) => store, + Err(e) => return Err(anyhow::Error::msg(format!("Failed to lock store: {}", e))), + }; + + // write to WASM's caller_writer + match self.caller_io.write_all(buf) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "failed to write to caller_writer: {}", + e + ))) + } + } + + let params = vec![Val::I64(buf.len() as i64)]; + let mut res = vec![Val::I64(0)]; + match self.writer.call(&mut *store, ¶ms, &mut res) { + Ok(_) => { + match res.get(0) { + Some(wasmtime::Val::I64(v)) => { + if *v != buf.len() as i64 { + return Err(anyhow::Error::msg(format!( + "WASM write function returned unexpected value: {}", + *v + ))); + } + } + _ => { + return Err(anyhow::Error::msg( + "user_write_done function returned unexpected type / no return", + )) + } + }; + } + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + WRITER_FN, e + ))) + } + } + + Ok(()) + } +} + +impl WATERStreamTrait for WATERStream { + /// Connect to the target address with running the WASM connect function + fn connect(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { + info!("[HOST] WATERStream v1_preview connecting..."); + + let store_lock_result = self.core.store.lock(); + + let mut store = match store_lock_result { + Ok(store) => store, + Err(e) => return Err(anyhow::Error::msg(format!("Failed to lock store: {}", e))), + }; + + let fnc = match self.core.instance.get_func(&mut *store, DIAL_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + DIAL_FN + ))) + } + }; + + match fnc.call(&mut *store, &[], &mut []) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "connect function failed: {}", + e + ))) + } + } + + Ok(()) + } +} + +impl WATERStream { + pub fn init(_conf: &WATERConfig, core: H2O) -> Result { + info!("[HOST] WATERStream v1_preview..."); + + // constructing a pair of UnixStream for communicating between WASM and Host + let (caller_io, water_io) = UnixStream::pair()?; + + let water_io_file = unsafe { cap_std::fs::File::from_raw_fd(water_io.as_raw_fd()) }; + + // insert file here + let water_io_file = wasmtime_wasi::sync::file::File::from_cap_std(water_io_file); + + std::mem::forget(water_io); // forget the water_io, so that it won't be closed + + let reader; + let writer; + + { + let mut store = core + .store + .lock() + .map_err(|e| anyhow::Error::msg(format!("Failed to lock store: {}", e)))?; + + let ctx = store + .data_mut() + .preview1_ctx + .as_mut() + .context("Failed to retrieve preview1_ctx from Host")?; + let water_io_fd = ctx.push_file(Box::new(water_io_file), FileAccessMode::all())?; + + let water_bridging = match core.instance.get_func(&mut *store, WATER_BRIDGING_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + WATER_BRIDGING_FN + ))) + } + }; + + let params: Vec = vec![Val::I32(water_io_fd as i32)]; + match water_bridging.call(&mut *store, ¶ms, &mut []) { + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + WATER_BRIDGING_FN, e + ))) + } + } + + // getting reader & writer func from WASM + reader = match core.instance.get_func(&mut *store, READER_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + READER_FN + ))) + } + }; + + writer = match core.instance.get_func(&mut *store, WRITER_FN) { + Some(func) => func, + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + WRITER_FN + ))) + } + }; + } + + let runtime = WATERStream { + reader, + writer, + + caller_io, + + core, + }; + + Ok(runtime) + } +} diff --git a/crates/water/src/runtime/version.rs b/crates/water/src/runtime/version.rs index 15518a8..20d0f71 100644 --- a/crates/water/src/runtime/version.rs +++ b/crates/water/src/runtime/version.rs @@ -1,8 +1,14 @@ use std::fmt; use std::str::FromStr; +use std::sync::Mutex; +use crate::runtime::v0::config::{Config, V0Config}; +use crate::runtime::*; + +#[derive(Clone)] pub enum Version { - V0, + Unknown, + V0(Option>>), V1, V2, } @@ -15,11 +21,60 @@ impl Version { } } + // Current API v0 needs some configurations at the beginning + pub fn config_v0(&mut self, conf: &WATERConfig) -> Result { + info!("[HOST] WATERCore configuring for V0"); + + let wasm_config = Config::from(&conf.config_wasm)?; + + let v = match conf.client_type { + WaterBinType::Dial => { + let v0_conf = V0Config::init( + "CONNECT".into(), + wasm_config.local_address.clone(), + wasm_config.local_port, + wasm_config.remote_address.clone(), + wasm_config.remote_port, + )?; + Version::V0(Some(Arc::new(Mutex::new(v0_conf)))) + } + WaterBinType::Listen => { + let v0_conf = V0Config::init( + "LISTEN".into(), + wasm_config.local_address.clone(), + wasm_config.local_port, + wasm_config.remote_address.clone(), + wasm_config.remote_port, + )?; + Version::V0(Some(Arc::new(Mutex::new(v0_conf)))) + } + WaterBinType::Relay => { + let v0_conf = V0Config::init( + "RELAY".into(), + wasm_config.local_address.clone(), + wasm_config.local_port, + wasm_config.remote_address.clone(), + wasm_config.remote_port, + )?; + Version::V0(Some(Arc::new(Mutex::new(v0_conf)))) + } + WaterBinType::Unknown => { + Version::Unknown // WATER is setting up? + } + _ => { + unimplemented!("This client type is not supported yet") + } + }; + + Ok(v) + } + pub fn as_str(&self) -> &'static str { - match *self { - Version::V0 => "V0", - Version::V1 => "V1", - Version::V2 => "V2", + match self { + Version::Unknown => "_water_setting_up", + Version::V0(_v0_conf) => "_water_v0", + Version::V1 => "_water_v1", + Version::V2 => "_water_v2", } } } @@ -29,9 +84,9 @@ impl FromStr for Version { fn from_str(s: &str) -> Result { match s { - "V0" => Ok(Version::V0), - "V1" => Ok(Version::V1), - "V2" => Ok(Version::V2), + "_water_v0" => Ok(Version::V0(None)), + "_water_v1" => Ok(Version::V1), + "_water_v2" => Ok(Version::V2), _ => Err(()), } } @@ -40,9 +95,10 @@ impl FromStr for Version { impl From<&Version> for &'static str { fn from(v: &Version) -> &'static str { match v { - Version::V0 => "V0", - Version::V1 => "V1", - Version::V2 => "V2", + Version::Unknown => "_water_setting_up", + Version::V0(_v0_conf) => "_water_v0", + Version::V1 => "_water_v1", + Version::V2 => "_water_v2", } } } diff --git a/crates/water/src/runtime/version_common/funcs.rs b/crates/water/src/runtime/version_common/funcs.rs index 98a2029..bf17bc3 100644 --- a/crates/water/src/runtime/version_common/funcs.rs +++ b/crates/water/src/runtime/version_common/funcs.rs @@ -5,7 +5,7 @@ pub fn export_config(linker: &mut Linker, config_file: String) -> Result<( linker .func_wrap( "env", - "request_config", + "pull_config", move |mut caller: Caller<'_, Host>| -> i32 { info!("[WASM] invoking Host exported request_config ..."); diff --git a/examples/water_bins/echo_client/echo_client.wasm b/examples/water_bins/echo_client/echo_client.wasm index aa5b026..ca67e57 100755 Binary files a/examples/water_bins/echo_client/echo_client.wasm and b/examples/water_bins/echo_client/echo_client.wasm differ diff --git a/examples/water_bins/echo_client/src/lib.rs b/examples/water_bins/echo_client/src/lib.rs index a13e8d8..7cc2942 100644 --- a/examples/water_bins/echo_client/src/lib.rs +++ b/examples/water_bins/echo_client/src/lib.rs @@ -11,7 +11,7 @@ use water_wasm::*; pub mod async_socks5_listener; // Export the version of this WASM module -#[export_name = "V1"] +#[export_name = "_water_v1"] pub static V1: i32 = 0; // create a mutable global variable stores a pointer to the config @@ -21,7 +21,7 @@ lazy_static! { } #[cfg(target_family = "wasm")] -#[export_name = "_init"] +#[export_name = "_water_init"] pub fn _init(debug: bool) { if debug { tracing_subscriber::fmt().with_max_level(Level::INFO).init(); @@ -39,7 +39,7 @@ pub fn _init(debug: bool) { info!("[WASM] running in _init"); } -#[export_name = "_set_inbound"] +#[export_name = "_water_set_inbound"] pub fn _water_bridging(fd: i32) { let mut global_dialer = match DIALER.lock() { Ok(dialer) => dialer, @@ -55,7 +55,7 @@ pub fn _water_bridging(fd: i32) { ); } -#[export_name = "_set_outbound"] +#[export_name = "_water_set_outbound"] pub fn _water_bridging_out(fd: i32) { let mut global_dialer = match DIALER.lock() { Ok(dialer) => dialer, @@ -71,7 +71,7 @@ pub fn _water_bridging_out(fd: i32) { ); } -#[export_name = "_config"] +#[export_name = "_water_config"] pub fn _process_config(fd: i32) { info!("[WASM] running in _process_config"); @@ -107,7 +107,7 @@ pub fn _process_config(fd: i32) { }; } -#[export_name = "_write"] +#[export_name = "_water_write"] pub fn _write(bytes_write: i64) -> i64 { let mut global_dialer = match DIALER.lock() { Ok(dialer) => dialer, @@ -129,7 +129,7 @@ pub fn _write(bytes_write: i64) -> i64 { } } -#[export_name = "_read"] +#[export_name = "_water_read"] pub fn _read() -> i64 { match DIALER.lock() { Ok(mut global_dialer) => { @@ -151,7 +151,7 @@ pub fn _read() -> i64 { } } -#[export_name = "_dial"] +#[export_name = "_water_dial"] pub fn _dial() { match DIALER.lock() { Ok(mut global_dialer) => { diff --git a/examples/water_bins/ss_client_wasm_v1/src/lib.rs b/examples/water_bins/ss_client_wasm_v1/src/lib.rs index e72cd1c..bf293f7 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/lib.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/lib.rs @@ -24,7 +24,7 @@ use futures::ready; use lazy_static::lazy_static; use pin_project::pin_project; use tokio::{ - io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}, + io::{copy_bidirectional, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}, net::{TcpListener, TcpStream}, }; use tracing::{debug, info, Level}; @@ -49,7 +49,7 @@ use water_wasm::*; use shadowsocks_crypto::{v1::random_iv_or_salt, v1::Cipher, CipherKind}; // Export version info -#[export_name = "V1"] +#[export_name = "_water_v1"] pub static V1: i32 = 0; // create a mutable global variable stores a pointer to the config diff --git a/examples/water_bins/ss_client_wasm_v1/src/socks5.rs b/examples/water_bins/ss_client_wasm_v1/src/socks5.rs index 4bd1802..5f1a151 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/socks5.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/socks5.rs @@ -1,6 +1,7 @@ use super::*; use bytes::{BufMut, BytesMut}; +use std::fmt::Formatter; use std::net::{SocketAddrV4, SocketAddrV6}; #[rustfmt::skip] @@ -36,6 +37,26 @@ impl Address { } } +impl Debug for Address { + #[inline] + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match *self { + Address::SocketAddress(ref addr) => write!(f, "{addr}"), + Address::DomainNameAddress(ref addr, ref port) => write!(f, "{addr}:{port}"), + } + } +} + +impl fmt::Display for Address { + #[inline] + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match *self { + Address::SocketAddress(ref addr) => write!(f, "{addr}"), + Address::DomainNameAddress(ref addr, ref port) => write!(f, "{addr}:{port}"), + } + } +} + #[inline] pub fn get_addr_len(atyp: &Address) -> usize { match *atyp { diff --git a/examples/water_bins/ss_client_wasm_v1/src/utils.rs b/examples/water_bins/ss_client_wasm_v1/src/utils.rs index 6f95a3a..d67dba4 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/utils.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/utils.rs @@ -225,3 +225,32 @@ where } .await } + +pub(crate) async fn establish_tcp_tunnel_bypassed( + plain: &mut P, + shadow: &mut S, + target_addr: &Address, +) -> io::Result<()> +where + P: AsyncRead + AsyncWrite + Unpin, + S: AsyncRead + AsyncWrite + Unpin, +{ + info!("established tcp tunnel to {} bypassed", target_addr); + + match copy_bidirectional(plain, shadow).await { + Ok((rn, wn)) => { + info!( + "tcp tunnel to {} (bypassed) closed, L2R {} bytes, R2L {} bytes", + target_addr, rn, wn + ); + } + Err(err) => { + info!( + "tcp tunnel to {} (bypassed) closed with error: {}", + target_addr, err + ); + } + } + + Ok(()) +} diff --git a/examples/water_bins/ss_client_wasm_v1/src/water.rs b/examples/water_bins/ss_client_wasm_v1/src/water.rs index 3e1f6a7..6b8b5f9 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/water.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/water.rs @@ -3,7 +3,7 @@ use super::*; use bytes::{BufMut, BytesMut}; #[cfg(target_family = "wasm")] -#[export_name = "_init"] +#[export_name = "_water_init"] pub fn _init(debug: bool) { if debug { tracing_subscriber::fmt().with_max_level(Level::INFO).init(); @@ -21,7 +21,7 @@ pub fn _init(debug: bool) { info!("[WASM] running in _init"); } -#[export_name = "_config"] +#[export_name = "_water_config"] pub fn _process_config(fd: i32) { info!("[WASM] running in _process_config"); @@ -37,16 +37,17 @@ pub fn _process_config(fd: i32) { } }; - let mut global_dialer = match DIALER.lock() { - Ok(dialer) => dialer, + let mut global_conn = match CONN.lock() { + Ok(conn) => conn, Err(e) => { eprintln!("[WASM] > ERROR: {}", e); return; } }; - // global_dialer.file_conn.config = config.clone(); - global_dialer.config = config; + global_conn.config = config; + + info!("[WASM] > _process_config: {:?}", global_conn.config); } Err(e) => { eprintln!( @@ -60,11 +61,19 @@ pub fn _process_config(fd: i32) { /// WASM Entry point here #[export_name = "v1_listen"] fn client_start() { - _start_listen().unwrap(); + let bypass = match CONN.lock() { + Ok(conn) => conn.config.bypass, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return; + } + }; + + _start_listen(bypass).unwrap(); } #[tokio::main(flavor = "current_thread")] -async fn _start_listen() -> std::io::Result<()> { +async fn _start_listen(bypass: bool) -> std::io::Result<()> { let fd = _listener_creation().unwrap(); // Set up pre-established listening socket. @@ -89,7 +98,7 @@ async fn _start_listen() -> std::io::Result<()> { // Spawn a background task for each new connection. tokio::spawn(async move { eprintln!("[WASM] > CONNECTED"); - match _handle_connection(socket).await { + match _handle_connection(socket, bypass).await { Ok(()) => eprintln!("[WASM] > DISCONNECTED"), Err(e) => eprintln!("[WASM] > ERROR: {}", e), } @@ -98,7 +107,7 @@ async fn _start_listen() -> std::io::Result<()> { } // SS handle incoming connections -async fn _handle_connection(stream: TcpStream) -> std::io::Result<()> { +async fn _handle_connection(stream: TcpStream, bypass: bool) -> std::io::Result<()> { let mut inbound_con = Socks5Handler::new(stream); inbound_con.socks5_greet().await.expect("Failed to greet"); @@ -106,11 +115,23 @@ async fn _handle_connection(stream: TcpStream) -> std::io::Result<()> { .socks5_get_target() .await .expect("Failed to get target address"); - let server_stream = _dial_server().expect("Failed to dial to SS-Server"); + // if proxied { + if bypass { + _connect_bypass(&target_addr, &mut inbound_con).await?; + } else { + _connect(target_addr, &mut inbound_con).await?; + } + + Ok(()) +} + +async fn _connect(target_addr: Address, inbound_con: &mut Socks5Handler) -> std::io::Result<()> { // FIXME: hardcoded server ip:address for now + only support connection with ip:port let server_addr = Address::SocketAddress(SocketAddr::from(([127, 0, 0, 1], 8388))); + let server_stream = _dial_remote(&server_addr).expect("Failed to dial to SS-Server"); + // Constructing the response header let mut buf = BytesMut::with_capacity(server_addr.serialized_len()); buf.put_slice(&[consts::SOCKS5_VERSION, consts::SOCKS5_REPLY_SUCCEEDED, 0x00]); @@ -141,15 +162,52 @@ async fn _handle_connection(stream: TcpStream) -> std::io::Result<()> { Ok(()) } -pub fn _dial_server() -> Result { - // NOTE: dial to SS-Server +async fn _connect_bypass( + target_addr: &Address, + inbound_con: &mut Socks5Handler, +) -> std::io::Result<()> { + let mut target_stream = _dial_remote(target_addr).expect("Failed to dial to SS-Server"); + + // Constructing the response header + let mut buf = BytesMut::with_capacity(target_addr.serialized_len()); + buf.put_slice(&[consts::SOCKS5_VERSION, consts::SOCKS5_REPLY_SUCCEEDED, 0x00]); + target_addr.write_to_buf(&mut buf); + + inbound_con.socks5_response(&mut buf).await; + + match establish_tcp_tunnel_bypassed(&mut inbound_con.stream, &mut target_stream, target_addr) + .await + { + Ok(()) => { + info!("tcp tunnel (bypassed) closed"); + } + Err(err) => { + eprintln!("tcp tunnel (proxied) closed with error: {}", err); + } + } + + Ok(()) +} + +pub fn _dial_remote(target: &Address) -> Result { let mut tcp_dialer = Dialer::new(); - // FIXME: Hardcoded server ip:port for now - tcp_dialer.config.remote_address = "127.0.0.1".to_string(); - tcp_dialer.config.remote_port = 8388; + // NOTE: only support ip:port for now, add DNS resolver helper from Host later + match target { + Address::SocketAddress(addr) => { + tcp_dialer.config.remote_address = addr.ip().to_string(); + tcp_dialer.config.remote_port = addr.port() as u32; + } + _ => { + eprintln!("Failed to get target address"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Failed to get target address", + )); + } + } - let _tcp_fd = tcp_dialer.dial().expect("Failed to dial"); + let _tcp_fd: i32 = tcp_dialer.dial().expect("Failed to dial"); let server_stream = match tcp_dialer.file_conn.outbound_conn.file.unwrap() { ConnStream::TcpStream(s) => s, @@ -174,38 +232,6 @@ pub fn _dial_server() -> Result { Ok(server_stream) } -#[cfg(feature = "direct_connect")] -pub fn _direct_connect() { - // create a new Dialer to dial any target address as it wants to - // Add more features later -- connect to target thru rules (direct / server) - // Connect to target address directly - { - let mut tcp_dialer = Dialer::new(); - tcp_dialer.config.remote_address = addr.ip().to_string(); - tcp_dialer.config.remote_port = addr.port() as u32; - - let tcp_fd = tcp_dialer.dial().expect("Failed to dial"); - - let server_stream = match tcp_dialer.file_conn.outbound_conn.file.unwrap() { - ConnStream::TcpStream(s) => s, - _ => { - eprintln!("Failed to get outbound tcp stream"); - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Failed to get outbound tcp stream", - )); - } - }; - - server_stream - .set_nonblocking(true) - .expect("Failed to set non-blocking"); - - let server_stream = - TcpStream::from_std(server_stream).expect("Failed to convert to tokio stream"); - } -} - pub fn _listener_creation() -> Result { let global_conn = match CONN.lock() { Ok(conf) => conf, @@ -218,6 +244,11 @@ pub fn _listener_creation() -> Result { } }; + info!( + "[WASM] creating listener at {}:{}", + global_conn.config.local_address, global_conn.config.local_port + ); + // FIXME: hardcoded the filename for now, make it a config later let stream = StreamConfigV1::init( global_conn.config.local_address.clone(), diff --git a/examples/water_bins/ss_client_wasm_v1/ss_client_wasm.wasm b/examples/water_bins/ss_client_wasm_v1/ss_client_wasm.wasm index 3995030..11e5042 100644 Binary files a/examples/water_bins/ss_client_wasm_v1/ss_client_wasm.wasm and b/examples/water_bins/ss_client_wasm_v1/ss_client_wasm.wasm differ diff --git a/plain.wasm b/plain.wasm new file mode 100644 index 0000000..77e1350 Binary files /dev/null and b/plain.wasm differ diff --git a/tests/benches/benchmarking_v0.rs b/tests/benches/benchmarking_v0.rs index a2148f5..b2c9a83 100644 --- a/tests/benches/benchmarking_v0.rs +++ b/tests/benches/benchmarking_v0.rs @@ -73,8 +73,8 @@ fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { config::WaterBinType::Dial, true, )?; - let mut water_client = runtime::WATERClient::new(conf)?; - water_client.connect("", 0)?; + let mut water_client = runtime::client::WATERClient::new(conf)?; + water_client.connect()?; // let mut water_client = TcpStream::connect(("127.0.0.1", 8088))?; diff --git a/tests/test_data/config.json b/tests/test_data/config.json index f621cfc..221d467 100644 --- a/tests/test_data/config.json +++ b/tests/test_data/config.json @@ -2,5 +2,6 @@ "remote_address": "127.0.0.1", "remote_port": 8080, "local_address": "127.0.0.1", - "local_port": 8088 + "local_port": 8088, + "bypass": false } \ No newline at end of file diff --git a/tests/test_wasm/echo_client.wasm b/tests/test_wasm/echo_client.wasm index e0c2079..ca67e57 100644 Binary files a/tests/test_wasm/echo_client.wasm and b/tests/test_wasm/echo_client.wasm differ diff --git a/tests/test_wasm/plain.wasm b/tests/test_wasm/plain.wasm new file mode 100644 index 0000000..8c5a90b Binary files /dev/null and b/tests/test_wasm/plain.wasm differ diff --git a/tests/test_wasm/ss_client_wasm.wasm b/tests/test_wasm/ss_client_wasm.wasm index 3995030..11e5042 100644 Binary files a/tests/test_wasm/ss_client_wasm.wasm and b/tests/test_wasm/ss_client_wasm.wasm differ diff --git a/tests/tests/cross_lang_tests.rs b/tests/tests/cross_lang_tests.rs new file mode 100644 index 0000000..fe341a1 --- /dev/null +++ b/tests/tests/cross_lang_tests.rs @@ -0,0 +1,179 @@ +#![allow(dead_code)] + +use water::*; + +use tracing::Level; + +use std::{ + fs::File, + io::{Error, ErrorKind, Read, Write}, + net::{TcpListener, TcpStream}, + vec, +}; + +use tempfile::tempdir; + +#[test] +fn test_cross_lang_wasm_dialer() -> Result<(), Box> { + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + + let cfg_str = r#" + { + "remote_address": "127.0.0.1", + "remote_port": 8080, + "local_address": "127.0.0.1", + "local_port": 8088, + "bypass": false + } + "#; + // Create a directory inside of `std::env::temp_dir()`. + let dir = tempdir()?; + let file_path = dir.path().join("temp-config.txt"); + let mut file = File::create(&file_path)?; + writeln!(file, "{}", cfg_str)?; + + let test_message = b"hello"; + let handle = std::thread::spawn(|| { + // let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); + let listener = TcpListener::bind(("127.0.0.1", 8080)).unwrap(); + let (mut socket, _) = listener.accept().unwrap(); + let mut buf = [0; 1024]; + let res = socket.read(&mut buf); + + assert!(res.is_ok()); + let read_bytes = res.unwrap(); + + assert_eq!(read_bytes, test_message.len()); + let res = socket.write(&buf[..read_bytes]); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), test_message.len()); + }); + + let conf = config::WATERConfig::init( + // plain.wasm is in v0 and fully compatible with the Go engine + // More details for the Go-side of running plain.wasm check here: + // https://github.com/gaukas/water/tree/master/examples/v0/plain + // + // More details for the implementation of plain.wasm check this PR: + // https://github.com/erikziyunchi/water-rs/pull/10 + // + String::from("./test_wasm/plain.wasm"), + String::from("_water_worker"), + String::from(file_path.to_string_lossy()), + config::WaterBinType::Dial, + true, + ) + .unwrap(); + + let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); + water_client.connect().unwrap(); + water_client.cancel_with().unwrap(); + + let handle_water = water_client.run_worker().unwrap(); + water_client.write(test_message).unwrap(); + + let mut buf = vec![0; 32]; + let res = water_client.read(&mut buf); + assert!(res.is_ok()); + assert_eq!(res.unwrap() as usize, test_message.len()); + + water_client.cancel().unwrap(); + + drop(file); + dir.close()?; + handle.join().unwrap(); + match handle_water.join().unwrap() { + Ok(_) => {} + Err(e) => { + eprintln!("Running _water_worker ERROR: {}", e); + return Err(Box::new(Error::new( + ErrorKind::Other, + "Failed to join _water_worker thread", + ))); + } + }; + + Ok(()) +} + +#[test] +fn test_cross_lang_wasm_listener() -> Result<(), Box> { + let cfg_str = r#" + { + "remote_address": "127.0.0.1", + "remote_port": 8088, + "local_address": "127.0.0.1", + "local_port": 8082, + "bypass": false + } + "#; + // Create a directory inside of `std::env::temp_dir()`. + let dir = tempdir()?; + let file_path = dir.path().join("temp-config.txt"); + let mut file = File::create(&file_path)?; + writeln!(file, "{}", cfg_str)?; + + let test_message = b"hello"; + + let conf = config::WATERConfig::init( + // plain.wasm is in v0 and fully compatible with the Go engine + // More details for the Go-side of running plain.wasm check here: + // https://github.com/gaukas/water/tree/master/examples/v0/plain + // + // More details for the implementation of plain.wasm check this PR: + // https://github.com/erikziyunchi/water-rs/pull/10 + // + String::from("./test_wasm/plain.wasm"), + String::from("_water_worker"), + String::from(file_path.to_string_lossy()), + config::WaterBinType::Listen, + true, + ) + .unwrap(); + + let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); + water_client.listen().unwrap(); + + let handle = std::thread::spawn(|| { + // give some time let the listener start to accept + std::thread::sleep(std::time::Duration::from_secs(1)); + let mut stream = TcpStream::connect(("127.0.0.1", 8082)).unwrap(); + let res = stream.write(test_message); + + assert!(res.is_ok()); + let write_bytes = res.unwrap(); + + assert_eq!(write_bytes, test_message.len()); + }); + + water_client.accept().unwrap(); + + water_client.cancel_with().unwrap(); + + let handle_water = water_client.run_worker().unwrap(); + + std::thread::sleep(std::time::Duration::from_secs(1)); + + let mut buf = vec![0; 32]; + let res = water_client.read(&mut buf); + assert!(res.is_ok()); + assert_eq!(res.unwrap() as usize, test_message.len()); + + water_client.cancel().unwrap(); + + drop(file); + dir.close()?; + handle.join().unwrap(); + match handle_water.join().unwrap() { + Ok(_) => {} + Err(e) => { + eprintln!("Running _water_worker ERROR: {}", e); + return Err(Box::new(Error::new( + ErrorKind::Other, + "Failed to join _water_worker thread", + ))); + } + }; + + Ok(()) +} diff --git a/tests/tests/echo_tests.rs b/tests/tests/echo_tests.rs index 7030308..f3abf77 100644 --- a/tests/tests/echo_tests.rs +++ b/tests/tests/echo_tests.rs @@ -15,7 +15,8 @@ fn test_echo() -> Result<(), Box> { "remote_address": "127.0.0.1", "remote_port": 8080, "local_address": "127.0.0.1", - "local_port": 8088 + "local_port": 8088, + "bypass": false } "#; // Create a directory inside of `std::env::temp_dir()`. @@ -43,15 +44,15 @@ fn test_echo() -> Result<(), Box> { let conf = config::WATERConfig::init( String::from("./test_wasm/echo_client.wasm"), - String::from("_init"), + String::from("_water_init"), String::from(file_path.to_string_lossy()), config::WaterBinType::Dial, true, ) .unwrap(); - let mut water_client = runtime::WATERClient::new(conf).unwrap(); - water_client.connect("127.0.0.1", 8080).unwrap(); + let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); + water_client.connect().unwrap(); water_client.write(test_message).unwrap(); let mut buf = vec![0; 32]; diff --git a/tests/tests/spinning_relay.rs b/tests/tests/spinning_relay.rs new file mode 100644 index 0000000..cedd557 --- /dev/null +++ b/tests/tests/spinning_relay.rs @@ -0,0 +1,181 @@ +#![allow(dead_code)] + +use water::*; + +use tracing::Level; + +use std::{ + fs::File, + io::{Error, ErrorKind, Read, Write}, + net::{TcpListener, TcpStream}, +}; + +use tempfile::tempdir; + +#[test] +fn test_cross_lang_wasm_relay() -> Result<(), Box> { + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + + let cfg_str = r#" + { + "remote_address": "127.0.0.1", + "remote_port": 8088, + "local_address": "127.0.0.1", + "local_port": 8080, + "bypass": false + } + "#; + // Create a directory inside of `std::env::temp_dir()`. + let dir = tempdir()?; + let file_path = dir.path().join("temp-config.txt"); + let mut file = File::create(&file_path)?; + writeln!(file, "{}", cfg_str)?; + + let test_message = b"hello"; + + // starting the listener in another thread it to relay to + let handle_remote = std::thread::spawn(|| { + let listener = TcpListener::bind(("127.0.0.1", 8088)).unwrap(); + let (mut socket, _) = listener.accept().unwrap(); + let mut buf = [0; 1024]; + let res = socket.read(&mut buf); + + assert!(res.is_ok()); + let read_bytes = res.unwrap(); + assert_eq!(read_bytes, test_message.len()); + + let res = socket.write(&buf[..read_bytes]); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), test_message.len()); + }); + + let conf = config::WATERConfig::init( + // plain.wasm is in v0 and fully compatible with the Go engine + // More details for the Go-side of running plain.wasm check here: + // https://github.com/gaukas/water/tree/master/examples/v0/plain + // + // More details for the implementation of plain.wasm check this PR: + // https://github.com/erikziyunchi/water-rs/pull/10 + // + String::from("./test_wasm/plain.wasm"), + String::from("_water_worker"), + String::from(file_path.to_string_lossy()), + config::WaterBinType::Relay, + true, + ) + .unwrap(); + + let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); + + water_client.relay().unwrap(); + + // connects to the relay, and the relay will connect to the listener + let handle_local = std::thread::spawn(|| { + // give some time let the listener start to accept + std::thread::sleep(std::time::Duration::from_secs(1)); + let mut stream = TcpStream::connect(("127.0.0.1", 8080)).unwrap(); + + let res = stream.write(test_message); + assert!(res.is_ok()); + let write_bytes = res.unwrap(); + assert_eq!(write_bytes, test_message.len()); + + let mut buf = [0; 1024]; + let res = stream.read(&mut buf); + assert!(res.is_ok()); + let read_bytes = res.unwrap(); + assert_eq!(read_bytes, test_message.len()); + }); + + water_client.associate().unwrap(); + water_client.cancel_with().unwrap(); + + let handle_water = water_client.run_worker().unwrap(); + + // give it a second before cancel to let the connector check correct transfer + std::thread::sleep(std::time::Duration::from_secs(2)); + + water_client.cancel().unwrap(); + + drop(file); + dir.close()?; + handle_remote.join().unwrap(); + handle_local.join().unwrap(); + match handle_water.join().unwrap() { + Ok(_) => {} + Err(e) => { + eprintln!("Running _water_worker ERROR: {}", e); + return Err(Box::new(Error::new( + ErrorKind::Other, + "Failed to join _water_worker thread", + ))); + } + }; + + Ok(()) +} + +// A test that do nothing but just spin up the relay for 20 seconds +// #[test] +fn spin_cross_lang_wasm_relay() -> Result<(), Box> { + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + + let cfg_str = r#" + { + "remote_address": "127.0.0.1", + "remote_port": 5201, + "local_address": "127.0.0.1", + "local_port": 8082, + "bypass": false + } + "#; + // Create a directory inside of `std::env::temp_dir()`. + let dir = tempdir()?; + let file_path = dir.path().join("temp-config.txt"); + let mut file = File::create(&file_path)?; + writeln!(file, "{}", cfg_str)?; + + let conf = config::WATERConfig::init( + // plain.wasm is in v0 and fully compatible with the Go engine + // More details for the Go-side of running plain.wasm check here: + // https://github.com/gaukas/water/tree/master/examples/v0/plain + // + // More details for the implementation of plain.wasm check this PR: + // https://github.com/erikziyunchi/water-rs/pull/10 + // + String::from("./test_wasm/plain.wasm"), + String::from("_water_worker"), + String::from(file_path.to_string_lossy()), + config::WaterBinType::Relay, + true, + ) + .unwrap(); + + let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); + + water_client.relay().unwrap(); + + water_client.associate().unwrap(); + water_client.cancel_with().unwrap(); + + let handle_water = water_client.run_worker().unwrap(); + + std::thread::sleep(std::time::Duration::from_secs(20)); + + water_client.cancel().unwrap(); + + drop(file); + dir.close()?; + match handle_water.join().unwrap() { + Ok(_) => {} + Err(e) => { + eprintln!("Running _water_worker ERROR: {}", e); + return Err(Box::new(Error::new( + ErrorKind::Other, + "Failed to join _water_worker thread", + ))); + } + }; + + Ok(()) +} diff --git a/tests/tests/ss_testing.rs b/tests/tests/ss_testing.rs index a1e7867..8e7fa1d 100644 --- a/tests/tests/ss_testing.rs +++ b/tests/tests/ss_testing.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use water::*; // use rand; @@ -6,9 +8,13 @@ use water::*; use tracing::Level; +use tempfile::tempdir; + use std::thread; use std::{ - net::{SocketAddr, ToSocketAddrs}, + fs::File, + io::Write, + net::{IpAddr, SocketAddr, ToSocketAddrs}, str, }; use tokio::{ @@ -111,7 +117,7 @@ impl Socks5TestServer { // "#; #[tokio::test] -async fn wasm_managed_shadowsocks_async() { +async fn wasm_managed_shadowsocks_async() -> Result<(), Box> { tracing_subscriber::fmt().with_max_level(Level::INFO).init(); // ==== setup official Shadowsocks server ==== @@ -121,6 +127,21 @@ async fn wasm_managed_shadowsocks_async() { const PASSWORD: &str = "Test!23"; const METHOD: CipherKind = CipherKind::CHACHA20_POLY1305; + let cfg_str = r#" + { + "remote_address": "127.0.0.1", + "remote_port": 8088, + "local_address": "127.0.0.1", + "local_port": 8080, + "bypass": false + } + "#; + // Create a directory inside of `std::env::temp_dir()`. + let dir = tempdir()?; + let file_path = dir.path().join("temp-config.txt"); + let mut file = File::create(&file_path)?; + writeln!(file, "{}", cfg_str)?; + let svr = Socks5TestServer::new(SERVER_ADDR, LOCAL_ADDR, PASSWORD, METHOD, false); svr.run().await; @@ -128,13 +149,16 @@ async fn wasm_managed_shadowsocks_async() { let conf = config::WATERConfig::init( String::from("./test_wasm/ss_client_wasm.wasm"), String::from("v1_listen"), - String::from("./test_data/config.json"), + // Currently using a temp file to pass config to WASM client + // can be easily configed here -- but can also use config.json + String::from(file_path.to_string_lossy()), + // String::from("./test_data/config.json"), config::WaterBinType::Runner, true, ) .unwrap(); - let mut water_client = runtime::WATERClient::new(conf).unwrap(); + let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); // ==== spawn a thread to run WASM Shadowsocks client ==== thread::spawn(move || { @@ -165,4 +189,111 @@ async fn wasm_managed_shadowsocks_async() { let http_status = b"HTTP/1.0 200 OK\r\n"; assert!(buf.starts_with(http_status)); + + Ok(()) +} + +#[tokio::test] +async fn wasm_managed_shadowsocks_bypass_async() -> Result<(), Box> { + let cfg_str = r#" + { + "remote_address": "127.0.0.1", + "remote_port": 0, + "local_address": "127.0.0.1", + "local_port": 8888, + "bypass": true + } + "#; + // Create a directory inside of `std::env::temp_dir()`. + let dir = tempdir()?; + let file_path = dir.path().join("temp-config.txt"); + let mut file = File::create(&file_path)?; + writeln!(file, "{}", cfg_str)?; + + // ==== setup WASM Shadowsocks client ==== + let conf = config::WATERConfig::init( + String::from("./test_wasm/ss_client_wasm.wasm"), + String::from("v1_listen"), + String::from(file_path.to_string_lossy()), + config::WaterBinType::Runner, + true, + ) + .unwrap(); + + let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); + + // ==== spawn a thread to run WASM Shadowsocks client ==== + thread::spawn(move || { + water_client.execute().unwrap(); + }); + + // Give some time for the WASM client to start + thread::sleep(Duration::from_millis(1000)); + + let wasm_ss_client_addr = SocketAddr::new("127.0.0.1".parse().unwrap(), 8888); + + // ==== test WASM Shadowsocks client ==== + // currently only support connect by ip, + // this is the ip of detectportal.firefox.com + let ip: IpAddr = "143.244.220.150".parse().unwrap(); + let port = 80; + + let mut c = Socks5TcpClient::connect( + Address::SocketAddress(SocketAddr::new(ip, port)), + wasm_ss_client_addr, + ) + .await + .unwrap(); + + let req = b"GET /success.txt HTTP/1.0\r\nHost: detectportal.firefox.com\r\nAccept: */*\r\n\r\n"; + c.write_all(req).await.unwrap(); + c.flush().await.unwrap(); + + let mut r = BufReader::new(c); + + let mut buf = Vec::new(); + r.read_until(b'\n', &mut buf).await.unwrap(); + + let http_status = b"HTTP/1.0 200 OK\r\n"; + assert!(buf.starts_with(http_status)); + + Ok(()) +} + +// Here is a test that runs the ss_client that has to be ended with signal +// #[test] +fn execute_wasm_shadowsocks_client() -> Result<(), Box> { + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + + let cfg_str = r#" + { + "remote_address": "138.197.211.159", + "remote_port": 5201, + "local_address": "127.0.0.1", + "local_port": 8080, + "bypass": true + } + "#; + + // Create a directory inside of `std::env::temp_dir()`. + let dir = tempdir()?; + let file_path = dir.path().join("temp-config.txt"); + let mut file = File::create(&file_path)?; + writeln!(file, "{}", cfg_str)?; + + // ==== setup WASM Shadowsocks client ==== + let conf = config::WATERConfig::init( + String::from("./test_wasm/ss_client_wasm.wasm"), + String::from("v1_listen"), + String::from(file_path.to_string_lossy()), + config::WaterBinType::Runner, + false, + ) + .unwrap(); + + let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); + + water_client.execute().unwrap(); + + Ok(()) }