Skip to content

Commit

Permalink
update: fix small v0 multi listener bug + v0 multi listener tests + m…
Browse files Browse the repository at this point in the history
…ore cli dev
  • Loading branch information
erikziyunchi committed Jan 9, 2024
1 parent c422064 commit a6fe4e8
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 104 deletions.
20 changes: 10 additions & 10 deletions crates/water/src/runtime/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl WATERClient {
}

/// keep_listen is the function that is called when user wants to accept a newly income connection,
/// it creates a new WASM instance and migrate the previous listener to it. Used by v0 listener and relay for now.
/// it creates a new WASM instance and migrate the previous listener to it. -- v0_plus listener and relay for now.
pub fn keep_listen(&mut self) -> Result<Self, anyhow::Error> {
info!("[HOST] WATERClient keep listening...",);

Expand Down Expand Up @@ -128,7 +128,7 @@ impl WATERClient {
self.debug = debug;
}

/// `connect` is the entry point for `Dialer` to connect to a remote address
/// `connect` is the function for `Dialer` to connect to a remote address
pub fn connect(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient connecting ...");

Expand Down Expand Up @@ -161,7 +161,7 @@ impl WATERClient {
Ok(())
}

/// `associate` is the entry point for `Relay` to associate with a remote addr
/// `associate` is the function for `Relay` to associate a remote connection
pub fn associate(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient relaying ...");

Expand All @@ -176,8 +176,8 @@ impl WATERClient {
Ok(())
}

/// `accept` is the entry point for `Listener` to accept a connection
/// called after `listen`
/// `accept` is the function for `Listener` to accept a connection
/// called after `listen()`
pub fn accept(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient accepting ...");

Expand All @@ -192,8 +192,8 @@ impl WATERClient {
Ok(())
}

/// `run_worker` is the entry point for `Runner` to run the entry_fn(a worker in WATM) in a separate thread
/// it will return a `JoinHandle` for the caller to manage the thread -- used by v0 currently
/// `run_worker` is the function to run the entry_fn(a worker in WATM) in a separate thread and return the thread handle
/// it will return a `JoinHandle` for the caller to manage the thread -- used by v0_plus
pub fn run_worker(
&mut self,
) -> Result<std::thread::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
Expand All @@ -207,7 +207,7 @@ impl WATERClient {
}
}

/// `execute` is the entry point for `Runner` to run the entry_fn(a worker in WATM) in the current thread
/// `execute` is the function to run the entry_fn(a worker in WATM) in the current thread
/// -- replace the thread running Host when running it <- used by v1 currently
pub fn execute(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient Executing ...");
Expand All @@ -229,7 +229,7 @@ impl WATERClient {
Ok(())
}

/// `cancel_with` is the function to set the pipe for canceling later -- v0
/// `cancel_with` is the function to set the cancel pipe for exiting later -- v0_plus
pub fn cancel_with(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient cancel_with ...");

Expand All @@ -251,7 +251,7 @@ impl WATERClient {
Ok(())
}

/// `cancel` is the function to cancel the thread running the entry_fn -- v0
/// `cancel` is the function to send thru the cancel_pipe and let the thread running the worker to exit -- v0_plus
pub fn cancel(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient canceling ...");

Expand Down
13 changes: 7 additions & 6 deletions crates/water/src/runtime/v0/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,14 @@ impl V0Config {
})
}

/// It will connect to the remote addr and set the fd in the V0Config
pub fn connect(&mut self) -> Result<std::net::TcpStream, anyhow::Error> {
let addr = format!("{}:{}", self.remote_addr, self.remote_port);

info!("[HOST] WATERCore V0 connecting to {}", addr);

match &mut self.conn {
// if the V0CRole is Relay, then it will remain as Relay
V0CRole::Relay(_, _, ref mut conn_fd) => {
// now relay has been built, need to dial
if *conn_fd != -1 {
Expand All @@ -109,6 +111,7 @@ impl V0Config {
*conn_fd = conn.as_raw_fd();
Ok(conn)
}
// if the V0CRole has not been set, and connect() was called, then it should be a dialer
V0CRole::Unknown => {
let conn = std::net::TcpStream::connect(addr)?;
self.conn = V0CRole::Dialer(conn.as_raw_fd());
Expand All @@ -118,6 +121,7 @@ impl V0Config {
}
}

/// It will create a listener and set the fd in the V0Config (for either listener or relay)
pub fn create_listener(&mut self, is_relay: bool) -> Result<(), anyhow::Error> {
let addr = format!("{}:{}", self.loc_addr, self.loc_port);

Expand All @@ -133,6 +137,7 @@ impl V0Config {
Ok(())
}

/// It will accept a connection and set the fd in the V0Config (for either listener or relay)
pub fn accept(&mut self) -> Result<std::net::TcpStream, anyhow::Error> {
info!("[HOST] WATERCore V0 accept with conn {:?} ...", self.conn);

Expand Down Expand Up @@ -166,6 +171,7 @@ impl V0Config {
}
}

/// It will close the connection to remote / accepted connection listened and exit gracefully
pub fn defer(&mut self) {
info!("[HOST] WATERCore V0 defer with conn {:?} ...", self.conn);

Expand Down Expand Up @@ -193,6 +199,7 @@ impl V0Config {
}
}

/// It is used for listener and relay only, to reset the accepted connection in the migrated listener / relay
pub fn reset_listener_or_relay(&mut self) {
info!(
"[HOST] WATERCore v0 reset lisener / relay with conn {:?} ...",
Expand All @@ -202,21 +209,15 @@ impl V0Config {
match self.conn {
V0CRole::Listener(_, ref mut accepted_fd) => {
if *accepted_fd != -1 {
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
drop(accepted_conn);
*accepted_fd = -1; // set it back to default
}
}
V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => {
if *accepted_fd != -1 {
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
drop(accepted_conn);
*accepted_fd = -1; // set it back to default
}

if *conn_fd != -1 {
let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) };
drop(conn);
*conn_fd = -1; // set it back to default
}
}
Expand Down
1 change: 1 addition & 0 deletions examples/clients/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ clap = { version="4.2.1", features = ["derive"] }
anyhow = "1.0.7"
tracing = "0.1"
tracing-subscriber = "0.3.17"
tokio = { version = "1", features = ["full"] }

water = {path="../../../crates/water", version="0.1.0"}
77 changes: 45 additions & 32 deletions examples/clients/cli/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use water::config::{WATERConfig, WaterBinType};
use water::globals::{CONFIG_WASM_PATH, MAIN, WASM_PATH};
use water::runtime;
use water::runtime::client::WATERClient;

use clap::Parser;

Expand Down Expand Up @@ -47,52 +47,30 @@ pub fn parse() -> Result<WATERConfig, anyhow::Error> {
Ok(conf)
}

pub fn parse_and_execute() -> Result<(), anyhow::Error> {
execute(parse()?)
pub async fn parse_and_execute() -> Result<(), anyhow::Error> {
execute(parse()?).await
}

pub fn execute(_conf: WATERConfig) -> Result<(), anyhow::Error> {
let mut water_client = runtime::client::WATERClient::new(_conf).unwrap();
pub async fn execute(_conf: WATERConfig) -> Result<(), anyhow::Error> {
let mut water_client = WATERClient::new(_conf).unwrap();

match water_client.config.client_type {
WaterBinType::Dial => {
water_client.connect().unwrap();
}
WaterBinType::Runner => {
// generally for v1_preview shadowsocks client
water_client.execute().unwrap();
}
WaterBinType::Listen => {
water_client.listen().unwrap();
water_client.accept().unwrap();
water_client.cancel_with().unwrap();

let handle_water = water_client.run_worker().unwrap();

// taking input from terminal
loop {
let mut buf = vec![0; 1024];
let res = water_client.read(&mut buf);

if res.is_ok() {
let str_buf = String::from_utf8(buf).unwrap();
if str_buf.trim() == "exit" {
water_client.cancel().unwrap();
break;
}

println!("Received: {}", str_buf);
} else {
println!("Error: {}", res.unwrap_err());
}
water_client.accept().unwrap();
let next_water_client = water_client.keep_listen().unwrap();
handle_incoming(water_client).await.unwrap();
water_client = next_water_client;
}

match handle_water.join().unwrap() {
Ok(_) => {}
Err(e) => {
eprintln!("Running _water_worker ERROR: {}", e);
return Err(anyhow::anyhow!("Failed to join _water_worker thread"));
}
};
}
WaterBinType::Relay => {
water_client.listen().unwrap();
Expand Down Expand Up @@ -137,3 +115,38 @@ pub fn execute(_conf: WATERConfig) -> Result<(), anyhow::Error> {

Ok(())
}

pub async fn handle_incoming(mut water_client: WATERClient) -> Result<(), anyhow::Error> {
water_client.cancel_with().unwrap();

let handle_water = water_client.run_worker().unwrap();

// taking input from terminal
loop {
let mut buf = vec![0; 1024];
let res = water_client.read(&mut buf);

if res.is_ok() {
let str_buf = String::from_utf8(buf).unwrap();
if str_buf.trim() == "exit" {
water_client.cancel().unwrap();
break;
}

println!("Received: {}", str_buf);
} else {
println!("Error: {}", res.unwrap_err());
break;
}
}

match handle_water.join().unwrap() {
Ok(_) => {}
Err(e) => {
eprintln!("Running _water_worker ERROR: {}", e);
return Err(anyhow::anyhow!("Failed to join _water_worker thread"));
}
};

Ok(())
}
5 changes: 3 additions & 2 deletions examples/clients/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use tracing::Level;

mod cli;

fn main() -> Result<(), anyhow::Error> {
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();

cli::parse_and_execute()
cli::parse_and_execute().await
}
Loading

0 comments on commit a6fe4e8

Please sign in to comment.