Skip to content

Commit

Permalink
CLI tool update (#20)
Browse files Browse the repository at this point in the history
* update: working cli (v1 runner mode)

* update: cli tool

* update: fix small v0 multi listener bug + v0 multi listener tests + more cli dev

* fix: modify test for v0 multi listener

* update: cli tool for all roles, docs

* remove 🚧 in cli readme [skip ci]
  • Loading branch information
erikziyunchi authored Jan 10, 2024
1 parent c12f7b6 commit a5df51a
Show file tree
Hide file tree
Showing 17 changed files with 331 additions and 168 deletions.
20 changes: 10 additions & 10 deletions crates/water/src/runtime/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl WATERClient {
}

/// keep_listen is the function that is called when user wants to accept a newly income connection,
/// it creates a new WASM instance and migrate the previous listener to it. Used by v0 listener and relay for now.
/// it creates a new WASM instance and migrate the previous listener to it. -- v0_plus listener and relay for now.
pub fn keep_listen(&mut self) -> Result<Self, anyhow::Error> {
info!("[HOST] WATERClient keep listening...",);

Expand Down Expand Up @@ -128,7 +128,7 @@ impl WATERClient {
self.debug = debug;
}

/// `connect` is the entry point for `Dialer` to connect to a remote address
/// `connect` is the function for `Dialer` to connect to a remote address
pub fn connect(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient connecting ...");

Expand Down Expand Up @@ -161,7 +161,7 @@ impl WATERClient {
Ok(())
}

/// `associate` is the entry point for `Relay` to associate with a remote addr
/// `associate` is the function for `Relay` to associate a remote connection
pub fn associate(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient relaying ...");

Expand All @@ -176,8 +176,8 @@ impl WATERClient {
Ok(())
}

/// `accept` is the entry point for `Listener` to accept a connection
/// called after `listen`
/// `accept` is the function for `Listener` to accept a connection
/// called after `listen()`
pub fn accept(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient accepting ...");

Expand All @@ -192,8 +192,8 @@ impl WATERClient {
Ok(())
}

/// `run_worker` is the entry point for `Runner` to run the entry_fn(a worker in WATM) in a separate thread
/// it will return a `JoinHandle` for the caller to manage the thread -- used by v0 currently
/// `run_worker` is the function to run the entry_fn(a worker in WATM) in a separate thread and return the thread handle
/// it will return a `JoinHandle` for the caller to manage the thread -- used by v0_plus
pub fn run_worker(
&mut self,
) -> Result<std::thread::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
Expand All @@ -207,7 +207,7 @@ impl WATERClient {
}
}

/// `execute` is the entry point for `Runner` to run the entry_fn(a worker in WATM) in the current thread
/// `execute` is the function to run the entry_fn(a worker in WATM) in the current thread
/// -- replace the thread running Host when running it <- used by v1 currently
pub fn execute(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient Executing ...");
Expand All @@ -229,7 +229,7 @@ impl WATERClient {
Ok(())
}

/// `cancel_with` is the function to set the pipe for canceling later -- v0
/// `cancel_with` is the function to set the cancel pipe for exiting later -- v0_plus
pub fn cancel_with(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient cancel_with ...");

Expand All @@ -251,7 +251,7 @@ impl WATERClient {
Ok(())
}

/// `cancel` is the function to cancel the thread running the entry_fn -- v0
/// `cancel` is the function to send thru the cancel_pipe and let the thread running the worker to exit -- v0_plus
pub fn cancel(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient canceling ...");

Expand Down
107 changes: 53 additions & 54 deletions crates/water/src/runtime/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,62 +75,10 @@ impl H2O<Host> {
return Err(anyhow::Error::msg("WATM module version not found"));
}

Self::setup_core(conf, linker, store, module, engine, version)
Self::create_core(conf, linker, store, module, engine, version)
}

/// This function is for migrating the v0 core for listener and relay
/// to handle every new connection will create a new separate core (as v0 spec)
pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O<Host>) -> Result<Self, anyhow::Error> {
info!("[HOST] WATERCore H2O v0_migrating...");

// reseting the listener accepted_fd or the relay's accepted_fd & dial_fd
// when migrating from existed listener / relay
let version = match &core.version {
Version::V0(v0conf) => {
match v0conf {
Some(og_v0_conf) => match og_v0_conf.lock() {
Ok(og_v0_conf) => {
let mut new_v0_conf_inner = og_v0_conf.clone();
// reset the new cloned v0conf
new_v0_conf_inner.reset_listener_or_relay();

Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner))))
}
Err(e) => {
return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?;
}
},
None => {
return Err(anyhow::anyhow!("v0_conf is None"))?;
}
}
}
_ => {
return Err(anyhow::anyhow!("This is not a V0 core"))?;
}
};

// NOTE: Some of the followings can reuse the existing core, leave to later explore
let wasm_config = wasmtime::Config::new();

#[cfg(feature = "multithread")]
{
wasm_config.wasm_threads(true);
}

let engine = Engine::new(&wasm_config)?;
let linker: Linker<Host> = Linker::new(&engine);

let module = Module::from_file(&engine, &conf.filepath)?;

let host = Host::default();
let store = Store::new(&engine, host);

Self::setup_core(conf, linker, store, module, engine, Some(version))
}

/// called by init_core() or v0_migrate_core() to setup the core (reduce code duplication)
pub fn setup_core(
pub fn create_core(
conf: &WATERConfig,
mut linker: Linker<Host>,
mut store: Store<Host>,
Expand Down Expand Up @@ -213,6 +161,57 @@ impl H2O<Host> {
})
}

// This function is for migrating the v0 core for listener and relay
// to handle every new connection is creating a new separate core (as v0 spec)
pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O<Host>) -> Result<Self, anyhow::Error> {
info!("[HOST] WATERCore H2O v0_migrating...");

// reseting the listener accepted_fd or the relay's accepted_fd & dial_fd
// when migrating from existed listener / relay
let version = match &core.version {
Version::V0(v0conf) => {
match v0conf {
Some(og_v0_conf) => match og_v0_conf.lock() {
Ok(og_v0_conf) => {
let mut new_v0_conf_inner = og_v0_conf.clone();
// reset the new cloned v0conf
new_v0_conf_inner.reset_listener_or_relay();

Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner))))
}
Err(e) => {
return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?;
}
},
None => {
return Err(anyhow::anyhow!("v0_conf is None"))?;
}
}
}
_ => {
return Err(anyhow::anyhow!("This is not a V0 core"))?;
}
};

// NOTE: Some of the followings can reuse the existing core, leave to later explore
let wasm_config = wasmtime::Config::new();

#[cfg(feature = "multithread")]
{
wasm_config.wasm_threads(true);
}

let engine = Engine::new(&wasm_config)?;
let linker: Linker<Host> = Linker::new(&engine);

let module = Module::from_file(&engine, &conf.filepath)?;

let host = Host::default();
let store = Store::new(&engine, host);

Self::create_core(conf, linker, store, module, engine, Some(version))
}

pub fn _prepare(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> {
self._init(conf.debug)?;
self._process_config(conf)?; // This is for now needed only by v1_preview
Expand Down
3 changes: 2 additions & 1 deletion crates/water/src/runtime/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ pub trait WATERTransportTrait: Send {
// read from WASM's caller_reader
match caller_io {
Some(ref mut caller_io) => match caller_io.read(buf) {
Ok(n) => Ok(n as i64),
Ok(n) if n > 0 => Ok(n as i64),
Ok(_) => Err(anyhow::Error::msg("Stream closed or read 0 bytes")),
Err(e) => Err(anyhow::Error::msg(format!(
"failed to read from caller_reader: {}",
e
Expand Down
17 changes: 9 additions & 8 deletions crates/water/src/runtime/v0/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,14 @@ impl V0Config {
})
}

/// It will connect to the remote addr and set the fd in the V0Config
pub fn connect(&mut self) -> Result<std::net::TcpStream, anyhow::Error> {
let addr = format!("{}:{}", self.remote_addr, self.remote_port);

info!("[HOST] WATERCore V0 connecting to {}", addr);

match &mut self.conn {
// if the V0CRole is Relay, then it will remain as Relay
V0CRole::Relay(_, _, ref mut conn_fd) => {
// now relay has been built, need to dial
if *conn_fd != -1 {
Expand All @@ -109,6 +111,7 @@ impl V0Config {
*conn_fd = conn.as_raw_fd();
Ok(conn)
}
// if the V0CRole has not been set, and connect() was called, then it should be a dialer
V0CRole::Unknown => {
let conn = std::net::TcpStream::connect(addr)?;
self.conn = V0CRole::Dialer(conn.as_raw_fd());
Expand All @@ -118,6 +121,7 @@ impl V0Config {
}
}

/// It will create a listener and set the fd in the V0Config (for either listener or relay)
pub fn create_listener(&mut self, is_relay: bool) -> Result<(), anyhow::Error> {
let addr = format!("{}:{}", self.loc_addr, self.loc_port);

Expand All @@ -133,6 +137,7 @@ impl V0Config {
Ok(())
}

/// It will accept a connection and set the fd in the V0Config (for either listener or relay)
pub fn accept(&mut self) -> Result<std::net::TcpStream, anyhow::Error> {
info!("[HOST] WATERCore V0 accept with conn {:?} ...", self.conn);

Expand All @@ -146,7 +151,7 @@ impl V0Config {

let (stream, _) = listener.accept()?;

*listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope
*listener_fd = listener.into_raw_fd(); // made sure the listener is not closed after scope
*accepted_fd = stream.as_raw_fd();

Ok(stream)
Expand All @@ -158,14 +163,15 @@ impl V0Config {

let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener_fd) };
let (stream, _) = listener.accept()?;
*listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope
*listener_fd = listener.into_raw_fd(); // made sure the listener is not closed after scope
*accepted_fd = stream.as_raw_fd();
Ok(stream)
}
_ => Err(anyhow::Error::msg("not a listener")),
}
}

/// It will close the connection to remote / accepted connection listened and exit gracefully
pub fn defer(&mut self) {
info!("[HOST] WATERCore V0 defer with conn {:?} ...", self.conn);

Expand Down Expand Up @@ -193,6 +199,7 @@ impl V0Config {
}
}

/// It is used for listener and relay only, to reset the accepted connection in the migrated listener / relay
pub fn reset_listener_or_relay(&mut self) {
info!(
"[HOST] WATERCore v0 reset lisener / relay with conn {:?} ...",
Expand All @@ -202,21 +209,15 @@ impl V0Config {
match self.conn {
V0CRole::Listener(_, ref mut accepted_fd) => {
if *accepted_fd != -1 {
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
drop(accepted_conn);
*accepted_fd = -1; // set it back to default
}
}
V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => {
if *accepted_fd != -1 {
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
drop(accepted_conn);
*accepted_fd = -1; // set it back to default
}

if *conn_fd != -1 {
let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) };
drop(conn);
*conn_fd = -1; // set it back to default
}
}
Expand Down
1 change: 1 addition & 0 deletions examples/clients/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ clap = { version="4.2.1", features = ["derive"] }
anyhow = "1.0.7"
tracing = "0.1"
tracing-subscriber = "0.3.17"
tokio = { version = "1", features = ["full"] }

water = {path="../../../crates/water", version="0.1.0"}
Loading

0 comments on commit a5df51a

Please sign in to comment.