Skip to content

Commit

Permalink
fix: sleep 1 sec before terminating relay to allow the test finish + …
Browse files Browse the repository at this point in the history
…clippy
  • Loading branch information
erikziyunchi committed Nov 10, 2023
1 parent 20b832d commit 8bbc3c7
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 31 deletions.
6 changes: 3 additions & 3 deletions crates/water/src/runtime/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ impl H2O<Host> {
Some(Version::V0(ref config)) => match config {
Some(v0_conf) => {
// let v0_conf = Arc::new(Mutex::new(v0_conf.clone()));
v0::funcs::export_tcp_connect(&mut linker, Arc::clone(&v0_conf))?;
v0::funcs::export_accept(&mut linker, Arc::clone(&v0_conf))?;
v0::funcs::export_defer(&mut linker, Arc::clone(&v0_conf))?;
v0::funcs::export_tcp_connect(&mut linker, Arc::clone(v0_conf))?;
v0::funcs::export_accept(&mut linker, Arc::clone(v0_conf))?;
v0::funcs::export_defer(&mut linker, Arc::clone(v0_conf))?;

// // if client_type is Listen, then create a listener with the same config
// if conf.client_type == WaterBinType::Listen {
Expand Down
12 changes: 5 additions & 7 deletions crates/water/src/runtime/v0/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,18 @@ impl V0Config {
info!("[HOST] WATERCore V0 connecting to {}", addr);

match &mut self.conn {
V0CRole::Relay(lis, ref mut conn_fd) => {
V0CRole::Relay(_lis, ref mut conn_fd) => {
// now relay has been built, need to dial
let conn = std::net::TcpStream::connect(addr)?;
*conn_fd = conn.as_raw_fd();
return Ok(conn);
Ok(conn)
}
V0CRole::Unknown => {
let conn = std::net::TcpStream::connect(addr)?;
self.conn = V0CRole::Dialer(conn.as_raw_fd());
return Ok(conn);
}
_ => {
return Err(anyhow::Error::msg("not a dialer"));
Ok(conn)
}
_ => Err(anyhow::Error::msg("not a dialer")),
}
}

Expand Down Expand Up @@ -158,7 +156,7 @@ impl V0Config {
let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn) };
drop(conn);
}
V0CRole::Relay(listener, conn) => {
V0CRole::Relay(_listener, conn) => {
// Listener shouldn't be deferred, like the above reason
// let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) };
// drop(listener);
Expand Down
7 changes: 3 additions & 4 deletions crates/water/src/runtime/v0/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ impl WATERListenerTrait for WATERListener<Host> {
fn listen(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> {
info!("[HOST] WATERListener v0 create listener...");

match &mut self.core.version {
Version::V0(v0_conf) => match v0_conf {
if let Version::V0(v0_conf) = &mut self.core.version {
match v0_conf {
Some(v0_conf) => match v0_conf.lock() {
Ok(mut v0_conf) => {
v0_conf.create_listener(false)?;
Expand All @@ -49,8 +49,7 @@ impl WATERListenerTrait for WATERListener<Host> {
None => {
return Err(anyhow::anyhow!("v0_conf is None"))?;
}
},
_ => {}
}
}

Ok(())
Expand Down
11 changes: 5 additions & 6 deletions crates/water/src/runtime/v0/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl WATERTransportTrait for WATERRelay<Host> {

impl WATERRelayTrait for WATERRelay<Host> {
/// Connect to the target address with running the WASM connect function
fn associate(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> {
fn associate(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> {
info!("[HOST] WATERRelay v0 associating...");

let mut store = self
Expand Down Expand Up @@ -72,12 +72,12 @@ impl WATERRelayTrait for WATERRelay<Host> {
Ok(())
}

fn relay(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> {
fn relay(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> {
info!("[HOST] WATERRelay v0 relaying...");

// create listener
match &mut self.core.version {
Version::V0(v0_conf) => match v0_conf {
if let Version::V0(v0_conf) = &mut self.core.version {
match v0_conf {
Some(v0_conf) => match v0_conf.lock() {
Ok(mut v0_conf) => {
v0_conf.create_listener(true)?;
Expand All @@ -89,8 +89,7 @@ impl WATERRelayTrait for WATERRelay<Host> {
None => {
return Err(anyhow::anyhow!("v0_conf is None"))?;
}
},
_ => {}
}
}

Ok(())
Expand Down
21 changes: 10 additions & 11 deletions tests/tests/cross_lang_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ fn test_cross_lang_wasm_relay() -> Result<(), Box<dyn std::error::Error>> {

assert!(res.is_ok());
let read_bytes = res.unwrap();

assert_eq!(read_bytes, test_message.len());

let res = socket.write(&buf[..read_bytes]);
assert!(res.is_ok());
assert_eq!(res.unwrap(), test_message.len());
Expand All @@ -230,37 +230,36 @@ fn test_cross_lang_wasm_relay() -> Result<(), Box<dyn std::error::Error>> {

water_client.relay().unwrap();

// std::thread::sleep(std::time::Duration::from_secs(1));

// connects to the relay, and the relay will connect to the listener
let handle_local = std::thread::spawn(|| {
// give some time let the listener start to accept
std::thread::sleep(std::time::Duration::from_secs(1));
let mut stream = TcpStream::connect(("127.0.0.1", 8084)).unwrap();
let res = stream.write(test_message);

let res = stream.write(test_message);
assert!(res.is_ok());
let write_bytes = res.unwrap();

assert_eq!(write_bytes, test_message.len());

let mut buf = [0; 1024];
let res = stream.read(&mut buf);
assert!(res.is_ok());
let read_bytes = res.unwrap();
assert_eq!(read_bytes, test_message.len());
});

water_client.associate().unwrap();

water_client.cancel_with().unwrap();

let handle_water = water_client.run_worker().unwrap();

// let mut buf = vec![0; 32];
// let res = water_client.read(&mut buf);
// assert!(res.is_ok());
// assert_eq!(res.unwrap() as usize, test_message.len());
// give it a second before cancel to let the connector check correct transfer
std::thread::sleep(std::time::Duration::from_secs(1));

water_client.cancel().unwrap();

drop(file);
dir.close()?;

handle_remote.join().unwrap();
handle_local.join().unwrap();
match handle_water.join().unwrap() {
Expand Down
115 changes: 115 additions & 0 deletions tests/tests/spinning_relay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#![allow(dead_code)]

use water::*;

use tracing::Level;

use std::{
fs::File,
io::{Error, ErrorKind, Read, Write},
net::{TcpListener, TcpStream},
vec,
};

use tempfile::tempdir;

// #[test]
fn test_cross_lang_wasm_relay() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();

let cfg_str = r#"
{
"remote_address": "127.0.0.1",
"remote_port": 8088,
"local_address": "127.0.0.1",
"local_port": 8082
}
"#;
// Create a directory inside of `std::env::temp_dir()`.
let dir = tempdir()?;
let file_path = dir.path().join("temp-config.txt");
let mut file = File::create(&file_path)?;
writeln!(file, "{}", cfg_str)?;

let test_message = b"hello";

// // starting the listener in another thread it to relay to
// let handle_remote = std::thread::spawn(|| {
// let listener = TcpListener::bind(("127.0.0.1", 8090)).unwrap();
// let (mut socket, _) = listener.accept().unwrap();
// let mut buf = [0; 1024];
// let res = socket.read(&mut buf);

// assert!(res.is_ok());
// let read_bytes = res.unwrap();
// assert_eq!(read_bytes, test_message.len());

// let res = socket.write(&buf[..read_bytes]);
// assert!(res.is_ok());
// assert_eq!(res.unwrap(), test_message.len());
// });

let conf = config::WATERConfig::init(
// plain.wasm is in v0 and fully compatible with the Go engine
// More details for the Go-side of running plain.wasm check here:
// https://github.com/gaukas/water/tree/master/examples/v0/plain
//
// More details for the implementation of plain.wasm check this PR:
// https://github.com/erikziyunchi/water-rs/pull/10
//
String::from("./test_wasm/plain.wasm"),
String::from("_water_worker"),
String::from(file_path.to_string_lossy()),
config::WaterBinType::Relay,
true,
)
.unwrap();

let mut water_client = runtime::client::WATERClient::new(conf).unwrap();

water_client.relay().unwrap();

// // connects to the relay, and the relay will connect to the listener
// let handle_local = std::thread::spawn(|| {
// // give some time let the listener start to accept
// std::thread::sleep(std::time::Duration::from_secs(3));
// let mut stream = TcpStream::connect(("127.0.0.1", 8084)).unwrap();

// let res = stream.write(test_message);
// assert!(res.is_ok());
// let write_bytes = res.unwrap();
// assert_eq!(write_bytes, test_message.len());

// let mut buf = [0; 1024];
// let res = stream.read(&mut buf);
// assert!(res.is_ok());
// let read_bytes = res.unwrap();
// assert_eq!(read_bytes, test_message.len());
// });

water_client.associate().unwrap();
water_client.cancel_with().unwrap();

let handle_water = water_client.run_worker().unwrap();

std::thread::sleep(std::time::Duration::from_secs(20));

water_client.cancel().unwrap();

drop(file);
dir.close()?;
// handle_remote.join().unwrap();
// handle_local.join().unwrap();
match handle_water.join().unwrap() {
Ok(_) => {}
Err(e) => {
eprintln!("Running _water_worker ERROR: {}", e);
return Err(Box::new(Error::new(
ErrorKind::Other,
"Failed to join _water_worker thread",
)));
}
};

Ok(())
}

0 comments on commit 8bbc3c7

Please sign in to comment.