diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 46371a6..16222df 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -33,7 +33,7 @@ jobs: run: cargo clippy --workspace --all-targets --verbose --all-features build: - name: Test and Build + name: Test and Build WASM needs: format # needs: [format, lint] strategy: @@ -45,7 +45,31 @@ jobs: steps: - uses: actions/checkout@v3 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + # Build for WATER host + - name: Build WATER Host crate + run: cargo build + working-directory: crates/water + + # install wasm32-wasi target + - name: Add wasm32-wasi target + run: rustup target add wasm32-wasi + + # Build for wasm32-wasi target + - name: Build wasm32-wasi Target + run: | + for member in crates/wasm/ examples/water_bins/ss_client_wasm_v1/ examples/water_bins/echo_client/; do + cargo build --verbose --manifest-path $member/Cargo.toml --target wasm32-wasi + done + env: + RUSTFLAGS: --cfg tokio_unstable + - name: Test run: cargo test --verbose --workspace --all-features - - name: Build - run: cargo build --verbose diff --git a/Cargo.toml b/Cargo.toml index 74085b6..f3c178f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] -members = ["crates/*", "examples/water_bins/*", "examples/clients/*"] +members = ["crates/*", "examples/water_bins/*", "examples/clients/*", "tests"] # exclude = ["crates/foo", "path/to/other"] default-members = ["crates/*"] resolver="2" diff --git a/README.md b/README.md index 59e6b9b..2601a8c 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,20 @@ nc 127.0.0.1 9005 ``` you should see `> CONNECTED` in the terminal of running WASM, then you can connect a bunch like this and input anything to see how it echos. + +## Running tests + +```sh +# runs ALL tests +cargo test --workspace --verbose + +# run tests for a single crate +cargo test -p --verbose + +# run a single test (or test matching name prefix) in a single crate +cargo test -p --verbose -- +``` + ## TODOs - [ ] wasm_config sharing implementation - [ ] Generalize Host export TCP listener helper function diff --git a/crates/wasm/.cargo/config b/crates/wasm/.cargo/config new file mode 100644 index 0000000..bca99d0 --- /dev/null +++ b/crates/wasm/.cargo/config @@ -0,0 +1,5 @@ +[build] +target = "wasm32-wasi" + +[target.wasm32-wasi] +rustflags = [ "--cfg", "tokio_unstable"] diff --git a/crates/water_wasm/Cargo.toml b/crates/wasm/Cargo.toml similarity index 100% rename from crates/water_wasm/Cargo.toml rename to crates/wasm/Cargo.toml diff --git a/crates/water_wasm/src/config.rs b/crates/wasm/src/config.rs similarity index 100% rename from crates/water_wasm/src/config.rs rename to crates/wasm/src/config.rs diff --git a/crates/water_wasm/src/connections.rs b/crates/wasm/src/connections.rs similarity index 100% rename from crates/water_wasm/src/connections.rs rename to crates/wasm/src/connections.rs diff --git a/crates/water_wasm/src/decoder.rs b/crates/wasm/src/decoder.rs similarity index 100% rename from crates/water_wasm/src/decoder.rs rename to crates/wasm/src/decoder.rs diff --git a/crates/water_wasm/src/dialer.rs b/crates/wasm/src/dialer.rs similarity index 95% rename from crates/water_wasm/src/dialer.rs rename to crates/wasm/src/dialer.rs index 6d38cff..6c92b9d 100644 --- a/crates/water_wasm/src/dialer.rs +++ b/crates/wasm/src/dialer.rs @@ -58,7 +58,7 @@ impl Dialer { }; if fd < 0 { - return Err(anyhow!("failed to create listener")); + return Err(anyhow!("failed to connect to remote")); } Ok(fd) diff --git a/crates/water_wasm/src/encoder.rs b/crates/wasm/src/encoder.rs similarity index 100% rename from crates/water_wasm/src/encoder.rs rename to crates/wasm/src/encoder.rs diff --git a/crates/water_wasm/src/lib.rs b/crates/wasm/src/lib.rs similarity index 100% rename from crates/water_wasm/src/lib.rs rename to crates/wasm/src/lib.rs diff --git a/crates/water_wasm/src/listener.rs b/crates/wasm/src/listener.rs similarity index 100% rename from crates/water_wasm/src/listener.rs rename to crates/wasm/src/listener.rs diff --git a/crates/water_wasm/src/net/mod.rs b/crates/wasm/src/net/mod.rs similarity index 100% rename from crates/water_wasm/src/net/mod.rs rename to crates/wasm/src/net/mod.rs diff --git a/crates/water_wasm/src/net/tls.rs b/crates/wasm/src/net/tls.rs similarity index 100% rename from crates/water_wasm/src/net/tls.rs rename to crates/wasm/src/net/tls.rs diff --git a/crates/water_wasm/src/v1/README.md b/crates/wasm/src/v1/README.md similarity index 100% rename from crates/water_wasm/src/v1/README.md rename to crates/wasm/src/v1/README.md diff --git a/crates/water_wasm/src/v1/async_listener_v1.rs b/crates/wasm/src/v1/async_listener_v1.rs similarity index 100% rename from crates/water_wasm/src/v1/async_listener_v1.rs rename to crates/wasm/src/v1/async_listener_v1.rs diff --git a/crates/water_wasm/src/v1/config_v1.rs b/crates/wasm/src/v1/config_v1.rs similarity index 100% rename from crates/water_wasm/src/v1/config_v1.rs rename to crates/wasm/src/v1/config_v1.rs diff --git a/crates/water_wasm/src/v1/dial_v1.rs b/crates/wasm/src/v1/dial_v1.rs similarity index 100% rename from crates/water_wasm/src/v1/dial_v1.rs rename to crates/wasm/src/v1/dial_v1.rs diff --git a/crates/water_wasm/src/v1/mod.rs b/crates/wasm/src/v1/mod.rs similarity index 100% rename from crates/water_wasm/src/v1/mod.rs rename to crates/wasm/src/v1/mod.rs diff --git a/crates/water_wasm/src/version.rs b/crates/wasm/src/version.rs similarity index 100% rename from crates/water_wasm/src/version.rs rename to crates/wasm/src/version.rs diff --git a/crates/water/src/config/mod.rs b/crates/water/src/config/mod.rs index 8424ab4..7bd0345 100644 --- a/crates/water/src/config/mod.rs +++ b/crates/water/src/config/mod.rs @@ -4,7 +4,7 @@ pub struct WATERConfig { pub filepath: String, pub entry_fn: String, pub config_wasm: String, - pub client_type: u32, + pub client_type: WaterBinType, pub debug: bool, } @@ -13,7 +13,7 @@ impl WATERConfig { filepath: String, entry_fn: String, config_wasm: String, - client_type: u32, + client_type: WaterBinType, debug: bool, ) -> Result { Ok(WATERConfig { @@ -25,3 +25,26 @@ impl WATERConfig { }) } } + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum WaterBinType { + Unknown, + Wrap, + Dial, + Listen, + Relay, + Runner, +} + +impl From for WaterBinType { + fn from(num: u32) -> Self { + match num { + 0 => WaterBinType::Dial, + 1 => WaterBinType::Listen, + 2 => WaterBinType::Runner, + 3 => WaterBinType::Wrap, + 4 => WaterBinType::Relay, + _ => WaterBinType::Unknown, + } + } +} diff --git a/crates/water/src/globals.rs b/crates/water/src/globals.rs index 62baec5..c704154 100644 --- a/crates/water/src/globals.rs +++ b/crates/water/src/globals.rs @@ -16,6 +16,7 @@ pub const WRITE_DONE_FN: &str = "_user_write_done"; pub const WATER_BRIDGING_FN: &str = "_set_inbound"; pub const READER_FN: &str = "_read"; pub const WRITER_FN: &str = "_write"; +pub const DIAL_FN: &str = "_dial"; pub const RUNTIME_VERSION_MAJOR: i32 = 0x001aaaaa; pub const RUNTIME_VERSION: &str = "v0.1-alpha"; diff --git a/crates/water/src/lib.rs b/crates/water/src/lib.rs index c4cd117..58b06d1 100644 --- a/crates/water/src/lib.rs +++ b/crates/water/src/lib.rs @@ -12,3 +12,11 @@ pub mod errors; pub mod globals; pub mod runtime; pub mod utils; + +#[cfg(test)] +mod tests { + #[test] + fn water_runtime_test() { + assert_eq!(1, 1); + } +} diff --git a/crates/water/src/runtime/core.rs b/crates/water/src/runtime/core.rs index 4dc653f..b8a1ba9 100644 --- a/crates/water/src/runtime/core.rs +++ b/crates/water/src/runtime/core.rs @@ -136,12 +136,16 @@ impl H2O { } }; - // open the config file and insert to WASM - let dir = Dir::open_ambient_dir(".", ambient_authority())?; // Open the root directory - let wasi_file = dir.open_with( - &config.config_wasm, - OpenOptions::new().read(true).write(true), - )?; + // Obtain the directory path and file name from config_wasm + let full_path = Path::new(&config.config_wasm); + let parent_dir = full_path.parent().unwrap(); // Assumes config_wasm has a parent directory + let file_name = full_path.file_name().unwrap().to_str().unwrap(); // Assumes file_name is valid UTF-8 + + // Open the parent directory + let dir = Dir::open_ambient_dir(parent_dir, ambient_authority())?; + + let wasi_file = dir.open_with(file_name, OpenOptions::new().read(true).write(true))?; + let wasi_file = wasmtime_wasi::sync::file::File::from_cap_std(wasi_file); let ctx = self.store.data_mut().preview1_ctx.as_mut().unwrap(); diff --git a/crates/water/src/runtime/mod.rs b/crates/water/src/runtime/mod.rs index 2327dfa..b190da1 100644 --- a/crates/water/src/runtime/mod.rs +++ b/crates/water/src/runtime/mod.rs @@ -13,6 +13,7 @@ pub mod version_common; use std::{ io::{Read, Write}, os::unix::io::{AsRawFd, FromRawFd}, + path::Path, sync::Arc, }; @@ -32,8 +33,8 @@ use wasmtime_wasi_threads::WasiThreadsCtx; // =================== CURRENT CRATE IMPORTS =================== use crate::{ - config::WATERConfig, - globals::{CONFIG_FN, INIT_FN, READER_FN, WATER_BRIDGING_FN, WRITER_FN}, + config::{WATERConfig, WaterBinType}, + globals::{CONFIG_FN, DIAL_FN, INIT_FN, READER_FN, WATER_BRIDGING_FN, WRITER_FN}, }; // =================== MODULES' DEPENDENCIES =================== @@ -61,19 +62,23 @@ pub struct WATERClient { impl WATERClient { pub fn new(conf: WATERConfig) -> Result { // client_type: 0 -> Dialer, 1 -> Listener, 2 -> Runner - let water: WATERClientType; - if conf.client_type == 0 { - let stream = WATERStream::init(&conf)?; - water = WATERClientType::Dialer(stream); - } else if conf.client_type == 1 { - let stream = WATERListener::init(&conf)?; - water = WATERClientType::Listener(stream); - } else if conf.client_type == 2 { - let runner = WATERRunner::init(&conf)?; - water = WATERClientType::Runner(runner); - } else { - return Err(anyhow::anyhow!("Invalid client type")); - } + let water = match conf.client_type { + WaterBinType::Dial => { + let stream = WATERStream::init(&conf)?; + WATERClientType::Dialer(stream) + } + WaterBinType::Listen => { + let stream = WATERListener::init(&conf)?; + WATERClientType::Listener(stream) + } + WaterBinType::Runner => { + let runner = WATERRunner::init(&conf)?; + WATERClientType::Runner(runner) + } + _ => { + return Err(anyhow::anyhow!("Invalid client type")); + } + }; Ok(WATERClient { config: conf, diff --git a/crates/water/src/runtime/runner.rs b/crates/water/src/runtime/runner.rs index e20bf04..7faab96 100644 --- a/crates/water/src/runtime/runner.rs +++ b/crates/water/src/runtime/runner.rs @@ -13,7 +13,10 @@ impl WATERRunner { .core .instance .get_func(&mut self.core.store, &conf.entry_fn) - .unwrap(); + .context(format!( + "failed to find declared entry function: {}", + &conf.entry_fn + ))?; match fnc.call(&mut self.core.store, &[], &mut []) { Ok(_) => {} Err(e) => return Err(anyhow::Error::msg(format!("run function failed: {}", e))), diff --git a/crates/water/src/runtime/stream.rs b/crates/water/src/runtime/stream.rs index b0d151c..e6c6709 100644 --- a/crates/water/src/runtime/stream.rs +++ b/crates/water/src/runtime/stream.rs @@ -125,11 +125,7 @@ impl WATERStream { // TODO: add addr:port sharing with WASM, for now WASM is using config.json's remote_addr:port // let fnc = self.core.instance.get_func(&mut self.core.store, &conf.entry_fn).unwrap(); - let fnc = match self - .core - .instance - .get_func(&mut self.core.store, &conf.entry_fn) - { + let fnc = match self.core.instance.get_func(&mut self.core.store, DIAL_FN) { Some(func) => func, None => { return Err(anyhow::Error::msg(format!( @@ -168,7 +164,12 @@ impl WATERStream { std::mem::forget(water_io); // forget the water_io, so that it won't be closed - let ctx = core.store.data_mut().preview1_ctx.as_mut().unwrap(); + let ctx = core + .store + .data_mut() + .preview1_ctx + .as_mut() + .context("Failed to retrieve preview1_ctx from Host")?; let water_io_fd = ctx.push_file(Box::new(water_io_file), FileAccessMode::all())?; let water_bridging = match core.instance.get_func(&mut core.store, WATER_BRIDGING_FN) { diff --git a/examples/clients/cli/Cargo.toml b/examples/clients/cli/Cargo.toml index 3f1213f..c7835f3 100644 --- a/examples/clients/cli/Cargo.toml +++ b/examples/clients/cli/Cargo.toml @@ -1,5 +1,5 @@ [package] -name="cli" +name="cli-dev" version = "0.1.0" authors.workspace = true description.workspace = true diff --git a/examples/clients/cli/src/cli.rs b/examples/clients/cli/src/cli.rs index 038e336..2954a06 100644 --- a/examples/clients/cli/src/cli.rs +++ b/examples/clients/cli/src/cli.rs @@ -26,7 +26,7 @@ struct Args { #[arg(short, long, default_value_t = 2)] type_client: u32, - /// Optional argument specifying the client_type, default to be Runner + /// Optional argument enabling debug logging #[arg(short, long, default_value_t = false)] debug: bool, } @@ -37,7 +37,7 @@ impl From for WATERConfig { filepath: args.wasm_path, entry_fn: args.entry_fn, config_wasm: args.config_wasm, - client_type: args.type_client, + client_type: args.type_client.into(), debug: args.debug, } } diff --git a/examples/water_bins/echo_client/Cargo.toml b/examples/water_bins/echo_client/Cargo.toml index 90e1094..a1c1556 100644 --- a/examples/water_bins/echo_client/Cargo.toml +++ b/examples/water_bins/echo_client/Cargo.toml @@ -28,4 +28,4 @@ url = { version = "2.2.2", features = ["serde"] } libc = "0.2.147" # water wasm lib import -water-wasm = { path = "../../../crates/water_wasm/", version = "0.1.0" } +water-wasm = { path = "../../../crates/wasm/", version = "0.1.0" } diff --git a/examples/water_bins/echo_client/proxy.wasm b/examples/water_bins/echo_client/proxy.wasm index 05afb34..e0c2079 100644 Binary files a/examples/water_bins/echo_client/proxy.wasm and b/examples/water_bins/echo_client/proxy.wasm differ diff --git a/examples/water_bins/echo_client/src/lib.rs b/examples/water_bins/echo_client/src/lib.rs index b09b6dc..a13e8d8 100644 --- a/examples/water_bins/echo_client/src/lib.rs +++ b/examples/water_bins/echo_client/src/lib.rs @@ -10,6 +10,10 @@ use water_wasm::*; pub mod async_socks5_listener; +// Export the version of this WASM module +#[export_name = "V1"] +pub static V1: i32 = 0; + // create a mutable global variable stores a pointer to the config lazy_static! { static ref DIALER: Mutex = Mutex::new(Dialer::new()); diff --git a/examples/water_bins/ss_client_wasm_v1/Cargo.toml b/examples/water_bins/ss_client_wasm_v1/Cargo.toml index 8f450a4..5416684 100644 --- a/examples/water_bins/ss_client_wasm_v1/Cargo.toml +++ b/examples/water_bins/ss_client_wasm_v1/Cargo.toml @@ -38,4 +38,4 @@ futures = "0.3.28" pin-project = "1.1.2" # water wasm lib import -water-wasm = { path = "../../../crates/water_wasm/", version = "0.1.0" } +water-wasm = { path = "../../../crates/wasm/", version = "0.1.0" } diff --git a/examples/water_bins/ss_client_wasm_v1/ss_client_wasm.wasm b/examples/water_bins/ss_client_wasm_v1/ss_client_wasm.wasm index a44c6df..3995030 100644 Binary files a/examples/water_bins/ss_client_wasm_v1/ss_client_wasm.wasm and b/examples/water_bins/ss_client_wasm_v1/ss_client_wasm.wasm differ diff --git a/tests/Cargo.toml b/tests/Cargo.toml new file mode 100644 index 0000000..4337d41 --- /dev/null +++ b/tests/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "tests" +version = "0.1.0" +authors.workspace = true +description.workspace = true +edition.workspace = true +publish = false + + +[dev-dependencies] +water-wasm = { path = "../crates/wasm" } +water = { path = "../crates/water" } + +tracing = "0.1" +tracing-subscriber = "0.3.17" +rand = "0.8" +pprof = { version = "0.11.1", features = ["flamegraph", "protobuf-codec", "prost-codec"] } +anyhow = "1.0.7" +shadowsocks-service = {version = "1.17.0", features = ["server"]} +shadowsocks-rust = "1.17.0" +tokio = { version = "1.24.2", features = ["full", "macros"] } +futures = "0.3.28" +tempfile = "3.8.0" \ No newline at end of file diff --git a/tests/SS_testing.rs b/tests/SS_testing.rs deleted file mode 100644 index d67d53a..0000000 --- a/tests/SS_testing.rs +++ /dev/null @@ -1,142 +0,0 @@ -// use cap_std::net::TcpStream; -use water::*; -use rand; - -use pprof::protos::Message; -use std::net::{TcpListener, TcpStream}; -use std::thread; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - -use tracing_subscriber; -use tracing::Level; - -use std::time::Instant; -use tracing::info; - -use std::io::{Read, Write, ErrorKind}; -use std::thread::sleep; -use std::time::Duration; - -#[test] -fn wasm_managed_shadowsocks_async() { - tracing_subscriber::fmt() - .with_max_level(Level::INFO) - .init(); - let conf = config::WATERConfig::init(String::from("./tests/test_wasm/ss_client_wasm.wasm"), - String::from("ss_client_execute"), - String::from("./tests/test_data/config.json"), - 2, - true).unwrap(); - - let mut water_client = runtime::WATERClient::new(conf).unwrap(); - water_client.execute().unwrap(); -} - -// #[test] -// fn SS_handler_testing() { -// tracing_subscriber::fmt() -// .with_max_level(Level::INFO) -// .init(); - -// let listener = TcpListener::bind("127.0.0.1:1080").expect("Failed to bind to address"); -// println!("Listening on {:?}", listener.local_addr().unwrap()); -// for stream in listener.incoming() { -// match stream { -// Ok(client) => { -// // handle onely 1 client -// handle_client(client); -// } -// Err(e) => { -// println!("Error accepting client: {}", e); -// } -// } -// } -// } - -// this is the test where SOCKS5 server + listener is at the Host -- V0 -// #[test] -// fn SS_client_no_socks5() -> Result<(), anyhow::Error> { -// tracing_subscriber::fmt() -// .with_max_level(Level::INFO) -// .init(); - -// // --------- start to dial the listener --------- -// let dial_handle = std::thread::spawn(|| -> Result<(), anyhow::Error> { -// // Measure initialization time -// let conf = config::WATERConfig::init(String::from("./tests/test_wasm/proxy.wasm"), String::from("_dial"), String::from("./tests/test_data/config.json"), 0, true)?; -// let mut water_client = runtime::WATERClient::new(conf)?; -// water_client.connect("", 0)?; - -// // let mut water_client = TcpStream::connect(("127.0.0.1", 8088))?; - -// // Not measuring the profiler guard initialization since it's unrelated to the read/write ops -// let guard = pprof::ProfilerGuard::new(100).unwrap(); - -// let single_data_size = 1024; // Bytes per iteration -// let total_iterations = 1; - -// let random_data: Vec = (0..single_data_size).map(|_| rand::random::()).collect(); - -// let start = Instant::now(); -// for _ in 0..total_iterations { -// water_client.write(&random_data)?; - -// let mut buf = vec![0; single_data_size]; -// water_client.read(&mut buf)?; -// } - -// let elapsed_time = start.elapsed().as_secs_f64(); -// let total_data_size_mb = (total_iterations * single_data_size) as f64; -// let avg_bandwidth = total_data_size_mb / elapsed_time / 1024.0 / 1024.0; - -// info!("avg bandwidth: {:.2} MB/s (N={})", avg_bandwidth, total_iterations); - - -// let single_data_size = 1024; // Bytes per iteration -// let total_iterations = 100; - -// let random_data: Vec = (0..single_data_size).map(|_| rand::random::()).collect(); - -// let start = Instant::now(); -// for _ in 0..total_iterations { -// water_client.write(&random_data)?; - -// let mut buf = vec![0; single_data_size]; -// water_client.read(&mut buf)?; -// } - -// let elapsed_time = start.elapsed().as_secs_f64(); -// let total_data_size_mb = (total_iterations * single_data_size) as f64; -// let avg_bandwidth = total_data_size_mb / elapsed_time / 1024.0 / 1024.0; - -// info!("avg bandwidth: {:.2} MB/s (N={})", avg_bandwidth, total_iterations); - -// // Stop and report profiler data -// if let Ok(report) = guard.report().build() { -// // println!("{:?}", report); -// // report.flamegraph(std::io::stdout())?; -// let mut file = std::fs::File::create("flamegraph.svg")?; -// report.flamegraph(file)?; - -// // let mut file = std::fs::File::create("profile.pb")?; -// // report.pprof(file)?; -// let mut file = std::fs::File::create("profile.pb").unwrap(); -// let profile = report.pprof().unwrap(); - -// let mut content = Vec::new(); -// // profile.encode(&mut content).unwrap(); -// profile.write_to_vec(&mut content).unwrap(); -// file.write_all(&content).unwrap(); -// } - -// Ok(()) -// }); - -// dial_handle.join().expect("Listener thread panicked")?; - -// // // Signal the listener thread to stop -// // should_stop.store(true, Ordering::Relaxed); - -// Ok(()) -// } \ No newline at end of file diff --git a/tests/benchmarking_v0.rs b/tests/benches/benchmarking_v0.rs similarity index 88% rename from tests/benchmarking_v0.rs rename to tests/benches/benchmarking_v0.rs index 75f2b40..a2148f5 100644 --- a/tests/benchmarking_v0.rs +++ b/tests/benches/benchmarking_v0.rs @@ -1,28 +1,26 @@ // use cap_std::net::TcpStream; use water::*; -use rand; use pprof::protos::Message; -use std::net::{TcpListener, TcpStream}; +// use std::net::{TcpListener, TcpStream}; +use std::net::TcpListener; use std::thread; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +// use std::sync::atomic::{AtomicBool, Ordering}; +// use std::sync::Arc; -use tracing_subscriber; use tracing::Level; use std::time::Instant; use tracing::info; -use std::io::{Read, Write, ErrorKind}; -use std::thread::sleep; -use std::time::Duration; +use std::io::{Read, Write}; +// use std::io::{Read, Write, ErrorKind}; +// use std::thread::sleep; +// use std::time::Duration; #[test] fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { - tracing_subscriber::fmt() - .with_max_level(Level::INFO) - .init(); + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); thread::spawn(move || { let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); @@ -39,7 +37,7 @@ fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { if n == 0 { break; // Connection was closed. } - + // Echo data back to client. if let Err(e) = stream.write_all(&buf[..n]) { eprintln!("Error writing to client: {:?}", e); @@ -68,20 +66,28 @@ fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { // --------- start to dial the listener --------- let dial_handle = std::thread::spawn(|| -> Result<(), anyhow::Error> { // Measure initialization time - let conf = config::WATERConfig::init(String::from("./tests/test_wasm/proxy.wasm"), String::from("_dial"), String::from("./tests/test_data/config.json"), 0, true)?; + let conf = config::WATERConfig::init( + String::from("./tests/test_wasm/proxy.wasm"), + String::from("_dial"), + String::from("./tests/test_data/config.json"), + config::WaterBinType::Dial, + true, + )?; let mut water_client = runtime::WATERClient::new(conf)?; water_client.connect("", 0)?; // let mut water_client = TcpStream::connect(("127.0.0.1", 8088))?; - + // Not measuring the profiler guard initialization since it's unrelated to the read/write ops let guard = pprof::ProfilerGuard::new(100).unwrap(); let single_data_size = 1024; // Bytes per iteration let total_iterations = 1; - - let random_data: Vec = (0..single_data_size).map(|_| rand::random::()).collect(); - + + let random_data: Vec = (0..single_data_size) + .map(|_| rand::random::()) + .collect(); + let start = Instant::now(); for _ in 0..total_iterations { water_client.write(&random_data)?; @@ -94,14 +100,18 @@ fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { let total_data_size_mb = (total_iterations * single_data_size) as f64; let avg_bandwidth = total_data_size_mb / elapsed_time / 1024.0 / 1024.0; - info!("avg bandwidth: {:.2} MB/s (N={})", avg_bandwidth, total_iterations); - - + info!( + "avg bandwidth: {:.2} MB/s (N={})", + avg_bandwidth, total_iterations + ); + let single_data_size = 1024; // Bytes per iteration let total_iterations = 100; - - let random_data: Vec = (0..single_data_size).map(|_| rand::random::()).collect(); - + + let random_data: Vec = (0..single_data_size) + .map(|_| rand::random::()) + .collect(); + let start = Instant::now(); for _ in 0..total_iterations { water_client.write(&random_data)?; @@ -114,14 +124,17 @@ fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { let total_data_size_mb = (total_iterations * single_data_size) as f64; let avg_bandwidth = total_data_size_mb / elapsed_time / 1024.0 / 1024.0; - info!("avg bandwidth: {:.2} MB/s (N={})", avg_bandwidth, total_iterations); - + info!( + "avg bandwidth: {:.2} MB/s (N={})", + avg_bandwidth, total_iterations + ); + // ================== test more iterations ================== // let single_data_size = 1024; // Bytes per iteration // let total_iterations = 10000; - + // let random_data: Vec = (0..single_data_size).map(|_| rand::random::()).collect(); - + // let start = Instant::now(); // for _ in 0..total_iterations { // water_client.write(&random_data)?; @@ -135,13 +148,12 @@ fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { // let avg_bandwidth = total_data_size_mb / elapsed_time / 1024.0 / 1024.0; // info!("avg bandwidth: {:.2} MB/s (N={})", avg_bandwidth, total_iterations); - - + // let single_data_size = 1024; // Bytes per iteration // let total_iterations = 43294; - + // let random_data: Vec = (0..single_data_size).map(|_| rand::random::()).collect(); - + // let start = Instant::now(); // for _ in 0..total_iterations { // water_client.write(&random_data)?; @@ -155,19 +167,19 @@ fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { // let avg_bandwidth = total_data_size_mb / elapsed_time / 1024.0 / 1024.0; // info!("avg bandwidth: {:.2} MB/s (N={})", avg_bandwidth, total_iterations); - + // Stop and report profiler data if let Ok(report) = guard.report().build() { // println!("{:?}", report); // report.flamegraph(std::io::stdout())?; - let mut file = std::fs::File::create("flamegraph.svg")?; + let file = std::fs::File::create("flamegraph.svg")?; report.flamegraph(file)?; - + // let mut file = std::fs::File::create("profile.pb")?; // report.pprof(file)?; let mut file = std::fs::File::create("profile.pb").unwrap(); let profile = report.pprof().unwrap(); - + let mut content = Vec::new(); // profile.encode(&mut content).unwrap(); profile.write_to_vec(&mut content).unwrap(); @@ -197,7 +209,7 @@ fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { // // water_client.execute(); // // Ok(()) - + // let tcp = std::net::TcpListener::bind(("127.0.0.1", 8088)).unwrap(); // loop { @@ -257,9 +269,9 @@ fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { // // keep reading from stdin and call read and write function from water_client.stream // let mut buf = String::new(); // std::io::stdin().read_line(&mut buf)?; - + // water_client.write(buf.as_bytes())?; - + // let mut buf = vec![0; 1024]; // water_client.read(&mut buf)?; @@ -267,4 +279,4 @@ fn benchmarking_v0_echo() -> Result<(), anyhow::Error> { // } // Ok(()) -// } \ No newline at end of file +// } diff --git a/tests/test_wasm/echo_client.wasm b/tests/test_wasm/echo_client.wasm new file mode 100644 index 0000000..e0c2079 Binary files /dev/null and b/tests/test_wasm/echo_client.wasm differ diff --git a/tests/test_wasm/ss_client_wasm.wasm b/tests/test_wasm/ss_client_wasm.wasm index a09bf1d..3995030 100644 Binary files a/tests/test_wasm/ss_client_wasm.wasm and b/tests/test_wasm/ss_client_wasm.wasm differ diff --git a/tests/tests/echo_tests.rs b/tests/tests/echo_tests.rs new file mode 100644 index 0000000..7030308 --- /dev/null +++ b/tests/tests/echo_tests.rs @@ -0,0 +1,66 @@ +use water::*; + +use std::{ + fs::File, + io::{Read, Write}, + net::TcpListener, +}; + +use tempfile::tempdir; + +#[test] +fn test_echo() -> Result<(), Box> { + let cfg_str = r#" + { + "remote_address": "127.0.0.1", + "remote_port": 8080, + "local_address": "127.0.0.1", + "local_port": 8088 + } + "#; + // Create a directory inside of `std::env::temp_dir()`. + let dir = tempdir()?; + let file_path = dir.path().join("temp-config.txt"); + let mut file = File::create(&file_path)?; + writeln!(file, "{}", cfg_str)?; + + let test_message = b"hello"; + let handle = std::thread::spawn(|| { + // let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); + let listener = TcpListener::bind(("127.0.0.1", 8080)).unwrap(); + let (mut socket, _) = listener.accept().unwrap(); + let mut buf = [0; 1024]; + let res = socket.read(&mut buf); + + assert!(res.is_ok()); + let read_bytes = res.unwrap(); + + assert_eq!(read_bytes, test_message.len()); + let res = socket.write(&buf[..read_bytes]); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), test_message.len()); + }); + + let conf = config::WATERConfig::init( + String::from("./test_wasm/echo_client.wasm"), + String::from("_init"), + String::from(file_path.to_string_lossy()), + config::WaterBinType::Dial, + true, + ) + .unwrap(); + + let mut water_client = runtime::WATERClient::new(conf).unwrap(); + water_client.connect("127.0.0.1", 8080).unwrap(); + water_client.write(test_message).unwrap(); + + let mut buf = vec![0; 32]; + let res = water_client.read(&mut buf); + assert!(res.is_ok()); + assert_eq!(res.unwrap() as usize, test_message.len()); + + drop(file); + dir.close()?; + handle.join().unwrap(); + Ok(()) +} diff --git a/tests/tests/ss_testing.rs b/tests/tests/ss_testing.rs new file mode 100644 index 0000000..a1e7867 --- /dev/null +++ b/tests/tests/ss_testing.rs @@ -0,0 +1,168 @@ +use water::*; + +// use rand; +// use pprof::protos::Message; +// use tracing::info; + +use tracing::Level; + +use std::thread; +use std::{ + net::{SocketAddr, ToSocketAddrs}, + str, +}; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + time::{self, Duration}, +}; + +use shadowsocks_service::{ + config::{ + Config, ConfigType, LocalConfig, LocalInstanceConfig, ProtocolType, ServerInstanceConfig, + }, + local::socks::client::socks5::Socks5TcpClient, + run_local, run_server, + shadowsocks::{ + config::{Mode, ServerAddr, ServerConfig}, + crypto::CipherKind, + relay::socks5::Address, + }, +}; + +pub struct Socks5TestServer { + local_addr: SocketAddr, + svr_config: Config, + cli_config: Config, +} + +impl Socks5TestServer { + pub fn new( + svr_addr: S, + local_addr: L, + pwd: &str, + method: CipherKind, + enable_udp: bool, + ) -> Socks5TestServer + where + S: ToSocketAddrs, + L: ToSocketAddrs, + { + let svr_addr = svr_addr.to_socket_addrs().unwrap().next().unwrap(); + let local_addr = local_addr.to_socket_addrs().unwrap().next().unwrap(); + + Socks5TestServer { + local_addr, + svr_config: { + let mut cfg = Config::new(ConfigType::Server); + cfg.server = vec![ServerInstanceConfig::with_server_config(ServerConfig::new( + svr_addr, + pwd.to_owned(), + method, + ))]; + cfg.server[0].config.set_mode(if enable_udp { + Mode::TcpAndUdp + } else { + Mode::TcpOnly + }); + cfg + }, + cli_config: { + let mut cfg = Config::new(ConfigType::Local); + cfg.local = vec![LocalInstanceConfig::with_local_config( + LocalConfig::new_with_addr(ServerAddr::from(local_addr), ProtocolType::Socks), + )]; + cfg.local[0].config.mode = if enable_udp { + Mode::TcpAndUdp + } else { + Mode::TcpOnly + }; + cfg.server = vec![ServerInstanceConfig::with_server_config(ServerConfig::new( + svr_addr, + pwd.to_owned(), + method, + ))]; + cfg + }, + } + } + + pub fn client_addr(&self) -> &SocketAddr { + &self.local_addr + } + + pub async fn run(&self) { + let svr_cfg = self.svr_config.clone(); + tokio::spawn(run_server(svr_cfg)); + + let client_cfg = self.cli_config.clone(); + tokio::spawn(run_local(client_cfg)); + + time::sleep(Duration::from_secs(1)).await; + } +} + +// const SERVER_CONF_STR: &str = r#" +// { +// "server": "127.0.0.1", +// "server_port": 8388, +// "password": "Test!23", +// "method": "chacha20-ietf-poly1305", +// } +// "#; + +#[tokio::test] +async fn wasm_managed_shadowsocks_async() { + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + + // ==== setup official Shadowsocks server ==== + const SERVER_ADDR: &str = "127.0.0.1:8388"; + const LOCAL_ADDR: &str = "127.0.0.1:8081"; + + const PASSWORD: &str = "Test!23"; + const METHOD: CipherKind = CipherKind::CHACHA20_POLY1305; + + let svr = Socks5TestServer::new(SERVER_ADDR, LOCAL_ADDR, PASSWORD, METHOD, false); + svr.run().await; + + // ==== setup WASM Shadowsocks client ==== + let conf = config::WATERConfig::init( + String::from("./test_wasm/ss_client_wasm.wasm"), + String::from("v1_listen"), + String::from("./test_data/config.json"), + config::WaterBinType::Runner, + true, + ) + .unwrap(); + + let mut water_client = runtime::WATERClient::new(conf).unwrap(); + + // ==== spawn a thread to run WASM Shadowsocks client ==== + thread::spawn(move || { + water_client.execute().unwrap(); + }); + + let wasm_ss_client_addr = SocketAddr::new("127.0.0.1".parse().unwrap(), 8080); + + // Give some time for the WASM client to start + thread::sleep(Duration::from_millis(100)); + + // ==== test WASM Shadowsocks client ==== + let mut c = Socks5TcpClient::connect( + Address::DomainNameAddress("detectportal.firefox.com".to_owned(), 80), + wasm_ss_client_addr, + ) + .await + .unwrap(); + + let req = b"GET /success.txt HTTP/1.0\r\nHost: detectportal.firefox.com\r\nAccept: */*\r\n\r\n"; + c.write_all(req).await.unwrap(); + c.flush().await.unwrap(); + + let mut r = BufReader::new(c); + + let mut buf = Vec::new(); + r.read_until(b'\n', &mut buf).await.unwrap(); + + let http_status = b"HTTP/1.0 200 OK\r\n"; + assert!(buf.starts_with(http_status)); +}