Skip to content

Commit

Permalink
update: match main
Browse files Browse the repository at this point in the history
  • Loading branch information
erikziyunchi committed Jan 7, 2024
2 parents bdb72a6 + 931e049 commit 8af50d1
Show file tree
Hide file tree
Showing 21 changed files with 1,017 additions and 14 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,9 @@ jobs:

# Build for wasm32-wasi target
- name: Build wasm32-wasi Target
run: |
for member in crates/wasm/ examples/water_bins/ss_client_wasm_v1/ examples/water_bins/echo_client/; do
cargo build --verbose --manifest-path $member/Cargo.toml --target wasm32-wasi
done
run: bash ./scripts/build_wasm_targets.sh
env:
RUSTFLAGS: --cfg tokio_unstable

- name: Test
run: cargo test --verbose --workspace --all-features
run: cargo test --verbose --workspace --all-features
5 changes: 5 additions & 0 deletions crates/wasm_v0/.cargo/config
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[build]
target = "wasm32-wasi"

[target.wasm32-wasi]
rustflags = [ "--cfg", "tokio_unstable"]
28 changes: 28 additions & 0 deletions crates/wasm_v0/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "water-wasm-v0"
version = "0.1.0"
authors.workspace = true
description.workspace = true
edition.workspace = true

[lib]
name = "water_wasm_v0"
path = "src/lib.rs"
crate-type = ["cdylib", "lib"]

[dependencies]
tokio = { version = "1.33.0", default-features = false, features = ["fs", "net", "rt", "macros", "io-util", "io-std", "time", "sync"] }
tokio-util = { version = "0.7.1", features = ["codec"] }

serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.107"
bincode = "1.3"

anyhow = "1.0.7"
tracing = "0.1"
tracing-subscriber = "0.3.17"
toml = "0.5.9"
lazy_static = "1.4"
url = { version = "2.2.2", features = ["serde"] }
libc = "0.2.147"

96 changes: 96 additions & 0 deletions crates/wasm_v0/src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::os::fd::FromRawFd;
use tokio::net::TcpStream;

// WASI Imports
extern "C" {
pub fn host_accept() -> i32; // obtain a connection (specified by returned fd) accepted by the host
pub fn host_dial() -> i32; // obtain a connection (specified by returned fd) dialed by the host
pub fn host_defer(); // call when exiting
#[allow(dead_code)]
pub fn pull_config() -> i32; // obtain a configuration file (specified by returned fd) from the host
}

// enumerated constants for Role (i32)
// 0: unknown
// 1: dialer
// 2: listener
// 3: relay
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Role {
Unknown = 0,
Dialer = 1,
Listener = 2,
Relay = 3,
}

pub struct AsyncFdConn {
fd: i32,
temp_stream: Option<std::net::TcpStream>, // used to hold the std tcp stream, will be upgraded to tokio stream later
stream: Option<TcpStream>,
}

impl Default for AsyncFdConn {
fn default() -> Self {
Self::new()
}
}

impl AsyncFdConn {
pub fn new() -> Self {
AsyncFdConn {
fd: -1,
temp_stream: None,
stream: None,
}
}

pub fn wrap(&mut self, fd: i32) -> Result<(), String> {
if self.fd > 0 {
return Err("already wrapped".to_string());
}
if fd < 0 {
return Err("invalid fd".to_string());
}
self.fd = fd;
println!("wrap: fd = {}", fd);
let stdstream = unsafe { std::net::TcpStream::from_raw_fd(fd) };

self.temp_stream = Some(stdstream);
// println!("wrap: stdstream = {:?}", stdstream);
// stdstream
// .set_nonblocking(true)
// .expect("Failed to set non-blocking");

// println!("wrap: stream = {:?}", stdstream);
// self.stream =
// Some(TcpStream::from_std(stdstream).expect("Failed to convert to tokio stream"));
// Ok(())
Ok(())
}

pub fn tokio_upgrade(&mut self) -> Result<(), String> {
if self.fd < 0 {
return Err("invalid fd".to_string());
}
let stdstream = self.temp_stream.take().unwrap();
stdstream
.set_nonblocking(true)
.expect("Failed to set non-blocking");
self.stream =
Some(TcpStream::from_std(stdstream).expect("Failed to convert to tokio stream"));
Ok(())
}

pub fn close(&mut self) {
if self.fd < 0 {
return;
}
let stream = self.stream.take().unwrap();
drop(stream);
self.fd = -1;
}

pub fn stream(&mut self) -> Option<&mut TcpStream> {
self.stream.as_mut()
}
}
20 changes: 20 additions & 0 deletions crates/wasm_v0/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Error is a enum in i32
#[allow(dead_code)]
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum Error {
None = 0,
Unknown = -1, // general error
InvalidArgument = -2, // invalid argument supplied to func call
InvalidConfig = -3, // config file provided is invalid
InvalidFd = -4, // invalid file descriptor provided
InvalidFunction = -5, // invalid function called
DoubleInit = -6, // initializing twice
FailedIO = -7, // Failing an I/O operation
NotInitialized = -8, // not initialized
}

impl Error {
pub fn i32(&self) -> i32 {
*self as i32
}
}
3 changes: 3 additions & 0 deletions crates/wasm_v0/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod common;
pub mod error;
pub mod v0plus;
220 changes: 220 additions & 0 deletions crates/wasm_v0/src/v0plus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use crate::{common::*, error};

pub const VERSION: i32 = 0x00000000; // v0plus share the same version number with v0

pub struct Dialer {
caller_conn: AsyncFdConn,
remote_conn: AsyncFdConn,
}

pub struct Listener {
caller_conn: AsyncFdConn,
source_conn: AsyncFdConn,
}
pub struct Relay {
source_conn: AsyncFdConn,
remote_conn: AsyncFdConn,
}

impl Default for Dialer {
fn default() -> Self {
Self::new()
}
}

impl Dialer {
pub fn new() -> Self {
Dialer {
caller_conn: AsyncFdConn::new(),
remote_conn: AsyncFdConn::new(),
}
}

pub fn dial(&mut self, caller_conn_fd: i32) -> Result<i32, String> {
// check if caller_conn_fd is valid
if caller_conn_fd < 0 {
return Err("invalid caller_conn_fd".to_string());
}
match self.caller_conn.wrap(caller_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// call external dial() to get remote_conn_fd
let remote_conn_fd = unsafe { host_dial() };
if remote_conn_fd < 0 {
return Err("dial failed".to_string());
}
match self.remote_conn.wrap(remote_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// return remote_conn_fd
Ok(remote_conn_fd)
}

// // borrow self.caller_conn
// pub fn caller(&mut self) -> Option<&mut TcpStream> {
// self.caller_conn.stream()
// }

// // borrow self.remote_conn
// pub fn remote(&mut self) -> Option<&mut TcpStream> {
// self.remote_conn.stream()
// }

pub fn close(&mut self) {
self.caller_conn.close();
self.remote_conn.close();
unsafe { host_defer() };
}
}

impl Default for Listener {
fn default() -> Self {
Self::new()
}
}

impl Listener {
pub fn new() -> Self {
Listener {
caller_conn: AsyncFdConn::new(),
source_conn: AsyncFdConn::new(),
}
}

pub fn accept(&mut self, caller_conn_fd: i32) -> Result<i32, String> {
// check if caller_conn_fd is valid
if caller_conn_fd < 0 {
return Err("Listener: invalid caller_conn_fd".to_string());
}

match self.caller_conn.wrap(caller_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// call external accept() to get source_conn_fd
let source_conn_fd = unsafe { host_accept() };
if source_conn_fd < 0 {
return Err("Listener: accept failed".to_string());
}

match self.source_conn.wrap(source_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// return source_conn_fd
Ok(source_conn_fd)
}

// // borrow self.caller_conn
// pub fn caller(&mut self) -> Option<&mut TcpStream> {
// self.caller_conn.stream()
// }

// // borrow self.source_conn
// pub fn source(&mut self) -> Option<&mut TcpStream> {
// self.source_conn.stream()
// }

pub fn close(&mut self) {
self.caller_conn.close();
self.source_conn.close();
unsafe { host_defer() };
}
}

impl Default for Relay {
fn default() -> Self {
Self::new()
}
}

impl Relay {
pub fn new() -> Self {
Relay {
source_conn: AsyncFdConn::new(),
remote_conn: AsyncFdConn::new(),
}
}

pub fn associate(&mut self) -> Result<i32, String> {
// call external accept() to get source_conn_fd
let source_conn_fd = unsafe { host_accept() };
if source_conn_fd < 0 {
return Err("Relay: accept failed".to_string());
}

match self.source_conn.wrap(source_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// call external dial() to get remote_conn_fd
let remote_conn_fd = unsafe { host_dial() };
if remote_conn_fd < 0 {
return Err("Relay: dial failed".to_string());
}
match self.remote_conn.wrap(remote_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// return remote_conn_fd
Ok(error::Error::None.i32())
}

// // borrow self.source_conn
// pub fn source(&mut self) -> Option<&mut TcpStream> {
// self.source_conn.stream()
// }

// // borrow self.remote_conn
// pub fn remote(&mut self) -> Option<&mut TcpStream> {
// self.remote_conn.stream()
// }

pub fn close(&mut self) {
self.source_conn.close();
self.remote_conn.close();
unsafe { host_defer() };
}
}

pub trait ConnPair {
fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)>;
}

impl ConnPair for Dialer {
fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)> {
Some((&mut self.caller_conn, &mut self.remote_conn))
}
}

impl ConnPair for Listener {
fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)> {
Some((&mut self.caller_conn, &mut self.source_conn))
}
}

impl ConnPair for Relay {
fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)> {
Some((&mut self.source_conn, &mut self.remote_conn))
}
}
Loading

0 comments on commit 8af50d1

Please sign in to comment.