Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI tool update #20

Merged
merged 36 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0063bb6
update: change namespaces to prefix
Oct 23, 2023
582d24d
update: v0+ API matching + plain runnable with Dialer
Oct 30, 2023
0302fa0
update: lint
Oct 30, 2023
d31964c
fix: fmt
Oct 30, 2023
400eb32
fix: clippy
Oct 30, 2023
177cbc0
fix: resolve conflicts from the prev PR
Nov 1, 2023
f2f1d72
update: keep the library code safe
Nov 1, 2023
fdec4f1
update: cleaning code + more sensible comments
erikziyunchi Nov 5, 2023
330f987
fix: clippy
erikziyunchi Nov 5, 2023
1e5f376
update: remove dup .wasm + link go-side runner for plain.wasm & PR fo…
erikziyunchi Nov 5, 2023
493061d
update: adding listener(base), tests, refactoring
erikziyunchi Nov 9, 2023
8447289
fix: fmt
erikziyunchi Nov 9, 2023
044eff6
fix: clippy
erikziyunchi Nov 9, 2023
cfddea1
fix: conflict
erikziyunchi Nov 9, 2023
20b832d
update: add relay, test
erikziyunchi Nov 10, 2023
8bbc3c7
fix: sleep 1 sec before terminating relay to allow the test finish + …
erikziyunchi Nov 10, 2023
d8bfa5f
fix: clippy & give a bit more time before terminate for relay test
erikziyunchi Nov 10, 2023
c2010a5
fix: isolate relay test with another test file
erikziyunchi Nov 10, 2023
3f569a3
fix: fmt
erikziyunchi Nov 10, 2023
90b5d93
fix: change port in relay test
erikziyunchi Nov 10, 2023
f6e7b46
update: add multi listener for v0
erikziyunchi Dec 18, 2023
cd43711
update: lint
erikziyunchi Dec 18, 2023
9e38b7a
fix: resolve conflict
erikziyunchi Dec 19, 2023
aa3bed6
fix: lint
erikziyunchi Dec 19, 2023
b169832
fix: new ss wasm binary + change port in test
erikziyunchi Dec 19, 2023
664e389
fix: fmt
erikziyunchi Dec 19, 2023
799932c
update: working cli (v1 runner mode)
erikziyunchi Dec 20, 2023
f2ac547
fmt + lint
erikziyunchi Dec 20, 2023
128a35b
resolve conflict
erikziyunchi Jan 9, 2024
a108dd8
update: cli tool
erikziyunchi Jan 9, 2024
c422064
fix: fmt
erikziyunchi Jan 9, 2024
a6fe4e8
update: fix small v0 multi listener bug + v0 multi listener tests + m…
erikziyunchi Jan 9, 2024
2a575d4
fix: modify test for v0 multi listener
erikziyunchi Jan 10, 2024
54162d2
update: cli tool for all roles, docs
erikziyunchi Jan 10, 2024
ac599e9
clean up
erikziyunchi Jan 10, 2024
beab99c
remove 🚧 in cli readme [skip ci]
erikziyunchi Jan 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions crates/water/src/runtime/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl WATERClient {
}

/// keep_listen is the function that is called when user wants to accept a newly income connection,
/// it creates a new WASM instance and migrate the previous listener to it. Used by v0 listener and relay for now.
/// it creates a new WASM instance and migrate the previous listener to it. -- v0_plus listener and relay for now.
pub fn keep_listen(&mut self) -> Result<Self, anyhow::Error> {
info!("[HOST] WATERClient keep listening...",);

Expand Down Expand Up @@ -128,7 +128,7 @@ impl WATERClient {
self.debug = debug;
}

/// `connect` is the entry point for `Dialer` to connect to a remote address
/// `connect` is the function for `Dialer` to connect to a remote address
pub fn connect(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient connecting ...");

Expand Down Expand Up @@ -161,7 +161,7 @@ impl WATERClient {
Ok(())
}

/// `associate` is the entry point for `Relay` to associate with a remote addr
/// `associate` is the function for `Relay` to associate a remote connection
pub fn associate(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient relaying ...");

Expand All @@ -176,8 +176,8 @@ impl WATERClient {
Ok(())
}

/// `accept` is the entry point for `Listener` to accept a connection
/// called after `listen`
/// `accept` is the function for `Listener` to accept a connection
/// called after `listen()`
pub fn accept(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient accepting ...");

Expand All @@ -192,8 +192,8 @@ impl WATERClient {
Ok(())
}

/// `run_worker` is the entry point for `Runner` to run the entry_fn(a worker in WATM) in a separate thread
/// it will return a `JoinHandle` for the caller to manage the thread -- used by v0 currently
/// `run_worker` is the function to run the entry_fn(a worker in WATM) in a separate thread and return the thread handle
/// it will return a `JoinHandle` for the caller to manage the thread -- used by v0_plus
pub fn run_worker(
&mut self,
) -> Result<std::thread::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
Expand All @@ -207,7 +207,7 @@ impl WATERClient {
}
}

/// `execute` is the entry point for `Runner` to run the entry_fn(a worker in WATM) in the current thread
/// `execute` is the function to run the entry_fn(a worker in WATM) in the current thread
/// -- replace the thread running Host when running it <- used by v1 currently
pub fn execute(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient Executing ...");
Expand All @@ -229,7 +229,7 @@ impl WATERClient {
Ok(())
}

/// `cancel_with` is the function to set the pipe for canceling later -- v0
/// `cancel_with` is the function to set the cancel pipe for exiting later -- v0_plus
pub fn cancel_with(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient cancel_with ...");

Expand All @@ -251,7 +251,7 @@ impl WATERClient {
Ok(())
}

/// `cancel` is the function to cancel the thread running the entry_fn -- v0
/// `cancel` is the function to send thru the cancel_pipe and let the thread running the worker to exit -- v0_plus
pub fn cancel(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient canceling ...");

Expand Down
107 changes: 53 additions & 54 deletions crates/water/src/runtime/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,62 +75,10 @@ impl H2O<Host> {
return Err(anyhow::Error::msg("WATM module version not found"));
}

Self::setup_core(conf, linker, store, module, engine, version)
Self::create_core(conf, linker, store, module, engine, version)
}

/// This function is for migrating the v0 core for listener and relay
/// to handle every new connection will create a new separate core (as v0 spec)
pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O<Host>) -> Result<Self, anyhow::Error> {
info!("[HOST] WATERCore H2O v0_migrating...");

// reseting the listener accepted_fd or the relay's accepted_fd & dial_fd
// when migrating from existed listener / relay
let version = match &core.version {
Version::V0(v0conf) => {
match v0conf {
Some(og_v0_conf) => match og_v0_conf.lock() {
Ok(og_v0_conf) => {
let mut new_v0_conf_inner = og_v0_conf.clone();
// reset the new cloned v0conf
new_v0_conf_inner.reset_listener_or_relay();

Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner))))
}
Err(e) => {
return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?;
}
},
None => {
return Err(anyhow::anyhow!("v0_conf is None"))?;
}
}
}
_ => {
return Err(anyhow::anyhow!("This is not a V0 core"))?;
}
};

// NOTE: Some of the followings can reuse the existing core, leave to later explore
let wasm_config = wasmtime::Config::new();

#[cfg(feature = "multithread")]
{
wasm_config.wasm_threads(true);
}

let engine = Engine::new(&wasm_config)?;
let linker: Linker<Host> = Linker::new(&engine);

let module = Module::from_file(&engine, &conf.filepath)?;

let host = Host::default();
let store = Store::new(&engine, host);

Self::setup_core(conf, linker, store, module, engine, Some(version))
}

/// called by init_core() or v0_migrate_core() to setup the core (reduce code duplication)
pub fn setup_core(
pub fn create_core(
conf: &WATERConfig,
mut linker: Linker<Host>,
mut store: Store<Host>,
Expand Down Expand Up @@ -213,6 +161,57 @@ impl H2O<Host> {
})
}

// This function is for migrating the v0 core for listener and relay
// to handle every new connection is creating a new separate core (as v0 spec)
pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O<Host>) -> Result<Self, anyhow::Error> {
info!("[HOST] WATERCore H2O v0_migrating...");

// reseting the listener accepted_fd or the relay's accepted_fd & dial_fd
// when migrating from existed listener / relay
let version = match &core.version {
Version::V0(v0conf) => {
match v0conf {
Some(og_v0_conf) => match og_v0_conf.lock() {
Ok(og_v0_conf) => {
let mut new_v0_conf_inner = og_v0_conf.clone();
// reset the new cloned v0conf
new_v0_conf_inner.reset_listener_or_relay();

Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner))))
}
Err(e) => {
return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?;
}
},
None => {
return Err(anyhow::anyhow!("v0_conf is None"))?;
}
}
}
_ => {
return Err(anyhow::anyhow!("This is not a V0 core"))?;
}
};

// NOTE: Some of the followings can reuse the existing core, leave to later explore
let wasm_config = wasmtime::Config::new();

#[cfg(feature = "multithread")]
{
wasm_config.wasm_threads(true);
}

let engine = Engine::new(&wasm_config)?;
let linker: Linker<Host> = Linker::new(&engine);

let module = Module::from_file(&engine, &conf.filepath)?;

let host = Host::default();
let store = Store::new(&engine, host);

Self::create_core(conf, linker, store, module, engine, Some(version))
}

pub fn _prepare(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> {
self._init(conf.debug)?;
self._process_config(conf)?; // This is for now needed only by v1_preview
Expand Down
3 changes: 2 additions & 1 deletion crates/water/src/runtime/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ pub trait WATERTransportTrait: Send {
// read from WASM's caller_reader
match caller_io {
Some(ref mut caller_io) => match caller_io.read(buf) {
Ok(n) => Ok(n as i64),
Ok(n) if n > 0 => Ok(n as i64),
Ok(_) => Err(anyhow::Error::msg("Stream closed or read 0 bytes")),
Err(e) => Err(anyhow::Error::msg(format!(
"failed to read from caller_reader: {}",
e
Expand Down
17 changes: 9 additions & 8 deletions crates/water/src/runtime/v0/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,14 @@ impl V0Config {
})
}

/// It will connect to the remote addr and set the fd in the V0Config
pub fn connect(&mut self) -> Result<std::net::TcpStream, anyhow::Error> {
let addr = format!("{}:{}", self.remote_addr, self.remote_port);

info!("[HOST] WATERCore V0 connecting to {}", addr);

match &mut self.conn {
// if the V0CRole is Relay, then it will remain as Relay
V0CRole::Relay(_, _, ref mut conn_fd) => {
// now relay has been built, need to dial
if *conn_fd != -1 {
erikziyunchi marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -109,6 +111,7 @@ impl V0Config {
*conn_fd = conn.as_raw_fd();
Ok(conn)
}
// if the V0CRole has not been set, and connect() was called, then it should be a dialer
V0CRole::Unknown => {
let conn = std::net::TcpStream::connect(addr)?;
self.conn = V0CRole::Dialer(conn.as_raw_fd());
Expand All @@ -118,6 +121,7 @@ impl V0Config {
}
}

/// It will create a listener and set the fd in the V0Config (for either listener or relay)
pub fn create_listener(&mut self, is_relay: bool) -> Result<(), anyhow::Error> {
let addr = format!("{}:{}", self.loc_addr, self.loc_port);

Expand All @@ -133,6 +137,7 @@ impl V0Config {
Ok(())
}

/// It will accept a connection and set the fd in the V0Config (for either listener or relay)
pub fn accept(&mut self) -> Result<std::net::TcpStream, anyhow::Error> {
info!("[HOST] WATERCore V0 accept with conn {:?} ...", self.conn);

Expand All @@ -146,7 +151,7 @@ impl V0Config {

let (stream, _) = listener.accept()?;

*listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope
*listener_fd = listener.into_raw_fd(); // made sure the listener is not closed after scope
*accepted_fd = stream.as_raw_fd();

Ok(stream)
Expand All @@ -158,14 +163,15 @@ impl V0Config {

let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener_fd) };
let (stream, _) = listener.accept()?;
*listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope
*listener_fd = listener.into_raw_fd(); // made sure the listener is not closed after scope
*accepted_fd = stream.as_raw_fd();
Ok(stream)
}
_ => Err(anyhow::Error::msg("not a listener")),
}
}

/// It will close the connection to remote / accepted connection listened and exit gracefully
pub fn defer(&mut self) {
info!("[HOST] WATERCore V0 defer with conn {:?} ...", self.conn);

Expand Down Expand Up @@ -193,6 +199,7 @@ impl V0Config {
}
}

/// It is used for listener and relay only, to reset the accepted connection in the migrated listener / relay
pub fn reset_listener_or_relay(&mut self) {
info!(
"[HOST] WATERCore v0 reset lisener / relay with conn {:?} ...",
Expand All @@ -202,21 +209,15 @@ impl V0Config {
match self.conn {
V0CRole::Listener(_, ref mut accepted_fd) => {
if *accepted_fd != -1 {
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
drop(accepted_conn);
*accepted_fd = -1; // set it back to default
}
}
V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => {
if *accepted_fd != -1 {
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
drop(accepted_conn);
*accepted_fd = -1; // set it back to default
}

if *conn_fd != -1 {
let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) };
drop(conn);
*conn_fd = -1; // set it back to default
}
}
Expand Down
1 change: 1 addition & 0 deletions examples/clients/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ clap = { version="4.2.1", features = ["derive"] }
anyhow = "1.0.7"
tracing = "0.1"
tracing-subscriber = "0.3.17"
tokio = { version = "1", features = ["full"] }

water = {path="../../../crates/water", version="0.1.0"}
Loading