Skip to content

Commit

Permalink
feat: support poll-io feature to run epoll over iouring (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
ihciah authored Feb 1, 2024
1 parent c332fb2 commit 137c173
Show file tree
Hide file tree
Showing 56 changed files with 1,474 additions and 559 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ members = [

# Internal
"examples",
"examples/tokio-io-compat",
]
resolver = "2"
50 changes: 31 additions & 19 deletions docs/en/compatiable-with-tokio-eco.md
Original file line number Diff line number Diff line change
@@ -1,38 +1,50 @@
---
title: Compatible with Tokio ecology
date: 2021-12-17 18:00:00
updated: 2024-01-30 15:00:00
author: ihciah
---

# Compatible with Tokio ecology
A large number of existing Rust components are compatible with Tokio, and they directly rely on Tokio's `AsyncRead` and `AsyncWrite` interfaces.
# Compatible with the Tokio Ecosystem
A large number of existing Rust components are compatible with Tokio and directly depend on Tokio's `AsyncRead` and `AsyncWrite` traits.

In Monoio, since the bottom layer is an asynchronous system call, we chose a similar tokio-uring approach: providing an IO interface that transfers the ownership of the buffer. But at this stage, obviously there are not many available libraries that can work, so we need to quickly support functions with a certain performance sacrifice.
In Monoio, due to the underlying asynchronous system calls, we chose an approach similar to tokio-uring: providing IO interfaces that transfer buffer ownership. However, at this stage, there are obviously not many libraries available that work with this, so we need some means to be compatible with existing interface components.

## monoio-compat
`monoio-compat` is a compatibility layer that implements Tokio's `AsyncRead` and `AsyncWrite` based on the buffer ownership interface.
Currently, there are 3 ways to achieve compatibility:

## tokio-compat
`tokio-compat` is a feature of monoio. When the `iouring` is disabled and the `legacy` feature is enabled, after turning on the `tokio-compat` feature, `TcpStream`/`UnixStream` will implement `tokio::io::{AsyncRead, AsyncWrite}`.

If you explicitly do not use iouring, then you can provide compatibility in this way. This form of compatibility has no overhead. If you might use iouring, then you should use the `poll-io` feature.

### How it works
For `poll_read`, the remaining capacity of the slice passed by the user will be read first, and then the buffer held by the user will be limited to this capacity and a future will be generated. After that, every time the user `poll_read` again, it will be forwarded to the `poll` method of this future. When returning to `Ready`, the contents of the buffer are copied to the slice passed by the user.
## poll-io
`poll-io` is a feature of monoio. After enabling this feature:
1. `tokio::io` will be re-exported to `monoio::io::poll_io`
2. `TcpStream`/`UnixStream` can be converted to and from `TcpStreamPoll`/`UnixStreamPoll`
3. `TcpStreamPoll`/`UnixStreamPoll` implements `tokio::io::{AsyncRead, AsyncWrite}`

For `poll_write`, the content of the slice passed by the user is copied to its own buffer first, then a future is generated and stored, and Ready is returned immediately. After that, every time the user `poll_write` again, it will first wait for the last content to be sent, then copy the data and return to Ready immediately. It behaves like BufWriter, and may causing errors to be delayed to be perceived.
The underlying implementation of this feature runs epoll to sense fd readiness on top of iouring and directly initiates a syscall. Although this form of compatibility cannot effectively utilize iouring for asynchronous io, its performance is similar to other epoll+syscall implementations without additional copy overhead.

## monoio-compat
`monoio-compat` is a compatibility layer that implements Tokio's `AsyncRead` and `AsyncWrite` based on an interface that transfers buffer ownership.

At the cost, using this compatibility layer will cost you an extra buffer copy.
### How It Works
For `poll_read`, it first reads into the remaining capacity of the slice passed by the user, then restricts its own buffer to that capacity and generates a future. Afterwards, every time the user again calls `poll_read`, it will forward to the `poll` method of this future. When returning `Ready`, it copies the contents of the buffer into the user's passed slice.

For `poll_write`, the content of the slice passed by the user is first copied to its own buffer, and then a future is generated and stored. After that, every time the user `poll_write` again, it will be forwarded to the `poll` method of this future. Return the result to the user when returning `Ready`.
For `poll_write`, it first copies the contents of the user's passed slice into its own buffer, then generates a future and stores it, and immediately returns Ready. Thereafter, every time the user again calls `poll_write`, it will first wait for the content of the last write to be fully sent before copying data and immediately returning Ready. This behavior is similar to that of a BufWriter, which can lead to delayed error detection.

In other words, using this compatibility layer will cost you an extra buffer copy overhead.
The cost of using this compatibility layer is an additional buffer copy overhead.

### Usage restrictions
For write operations, users need to manually call poll_flush or poll_shutdown of AsyncWrite at the end (or the corresponding flush or shutdown methods of AsyncWriteExt), otherwise the data may not be submitted to the kernel (continuous writes do not require manual flushing).
### Usage Restrictions
For write operations, users need to manually call AsyncWrite's poll_flush or poll_shutdown (or the corresponding flush or shutdown methods in AsyncWriteExt) at the end; otherwise, data may not be submitted to the kernel (continuous writes do not require manual flushing).

## Poll-oriented interface and asynchronous system call
There are two ways to express asynchrony in Rust, one is based on `poll` and the other is based on `async`. `poll` is synchronous, semantically expressing an immediate attempt; while `async` is essentially the syntactic sugar of poll, it will swallow the future and generate a state machine, which is executed in a loop during await.
## Poll-Based Interfaces and Asynchronous System Calls
There are two ways to express asynchrony in Rust: one is based on `poll`, and the other is based on `async`. `Poll` is synchronous and semantically expresses the trying immediately; while `async` is essentially syntactic sugar for poll which encapsulates the future and generates a state machine, executing this state machine in a loop when awaiting.

In Tokio, there are methods similar to `poll_read` and `poll_write`, both of which express the semantics of synchronous attempts.
In Tokio, there are methods like `poll_read` and `poll_write` that both express the semantic of synchronous trying.

When it returns to `Pending`, it means that IO is not ready (and the waker of cx is registered for notification), and when it returns to `Ready` it means that IO has been completed. It is easy to implement these two interfaces based on synchronous system calls. Directly make the corresponding system calls and determine the return result. If the IO is not ready, it will be suspended to Reactor.
When `Pending` is returned, it implies that the IO is not ready (and registers a notice with the waker in cx), and when `Ready` is returned, it means the IO has completed. It is easy to implement these two interfaces based on synchronous system calls, by directly making the corresponding system call and judging the return result, and if the IO is not ready, suspend to the Reactor.

However, these two interfaces are difficult to implement under asynchronous system calls. If we have pushed an Op into the io_uring SQ, the state of this syscall is uncertain before the corresponding CQE is consumed. We have no way to provide clear completed or unfinished semantics. In `monoio-compat`, we provide a poll-like interface through some hacks, so the lack of capabilities has led to our use restrictions. Under asynchronous system calls, it is more appropriate to pass the ownership of the buffer and cooperate with `async+await`.
However, these two interfaces are difficult to implement under asynchronous system calls. If we have already pushed an Op into the io_uring SQ, then the status of that syscall is uncertain until we consume the corresponding CQE. We cannot provide a clear completed or not completed semantics. In `monoio-compat`, we provide a poll-like interface through some hacks, so the lack of capabilities leads to our usage restrictions. Under asynchronous system calls, transferring buffer ownership in combination with `async+await` is more appropriate.

At present, the Rust standard library does not have a interface for asynchronous system calls, and there is no related component ecology. We are working hard to solve this problem.
Currently, Rust's standard library does not have a universal interface oriented towards asynchronous system calls, and neither does the related component ecosystem. We are working hard to improve this problem.
18 changes: 17 additions & 1 deletion docs/zh/compatiable-with-tokio-eco.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
---
title: 与 Tokio 生态兼容
date: 2021-12-17 18:00:00
updated: 2024-01-30 15:00:00
author: ihciah
---

# 与 Tokio 生态兼容
现有 Rust 组件中有大量的组件与 Tokio 是兼容的,它们直接依赖了 Tokio 的 `AsyncRead``AsyncWrite` 接口。

而在 Monoio,由于底层是异步系统调用,所以我们选择了类似 tokio-uring 的做法:提供传 buffer 所有权的 IO 接口。但现阶段明显没有很多可用的库可以工作,所以我们需要以一定的性能牺牲来快速支持功能。
而在 Monoio,由于底层是异步系统调用,所以我们选择了类似 tokio-uring 的做法:提供传 buffer 所有权的 IO 接口。但现阶段明显没有很多可用的库可以工作,所以我们需要一些手段来兼容现有接口的组件。

当前共有 3 种兼容手段:

## tokio-compat
`tokio-compat` 是 monoio 的一个 feature。当关闭 `iouring` 且打开 `legacy` feature 时,开启 `tokio-compat` feature 后,`TcpStream`/`UnixStream` 会实现 `tokio::io::{AsyncRead, AsyncWrite}`

如果你明确不使用 iouring,那么你可以通过这种方式提供兼容性。这种形式的兼容性是没有 overhead 的。如果你有可能会使用 iouring,那么你应该使用 `poll-io` feature。

## poll-io
`poll-io` 是 monoio 的一个 feature。当开启该 feature 后:
1. `tokio::io` 会被 reexport 到 `monoio::io::poll_io`
2. `TcpStream`/`UnixStream` 可以与 `TcpStreamPoll`/`UnixStreamPoll` 互相转换
3. `TcpStreamPoll`/`UnixStreamPoll` 实现 `tokio::io::{AsyncRead, AsyncWrite}`

这个 feature 的底层实现是在 iouring 上运行 epoll 来感知 fd readiness,并直接发起 syscall。这种形式的兼容虽然无法有效利用 iouring 做异步 io,但它的性能与其他基于 epoll+syscall 的实现是类似的,没有额外的拷贝开销。

## monoio-compat
`monoio-compat` 是一个兼容层,它基于 buffer 所有权的接口实现 Tokio 的 `AsyncRead``AsyncWrite`
Expand Down
14 changes: 10 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ monoio = { path = "../monoio", default-features = false, features = [
"legacy",
"macros",
"utils",
"poll-io", # experimental
] }

# Enable tracing and tracing-subscriber for print out runtime debug
Expand All @@ -25,15 +26,16 @@ monoio = { path = "../monoio", default-features = false, features = [
# tracing-subscriber = "0.3"

# For hyper examples
hyper = { version = "0.14", features = ["http1", "client", "server", "stream"] }
hyper = { version = "1.1", features = ["http1", "client", "server"] }
http-body-util = "0.1"

# For h2 examples
bytes = { version = "1" }
h2 = { version = "0.3" }
http = { version = "0.2" }
h2 = { version = "0.4" }
http = { version = "1" }

# For hyper and h2 examples
monoio-compat = { path = "../monoio-compat" }
monoio-compat = { path = "../monoio-compat", features = ["hyper"] }

tokio = { version = "1", default-features = false, features = ["io-util"] }
tower-service = "0.3"
Expand Down Expand Up @@ -70,6 +72,10 @@ path = "echo.rs"
name = "echo-tfo"
path = "echo_tfo.rs"

[[example]]
name = "echo-poll"
path = "echo_poll.rs"

[[example]]
name = "join"
path = "join.rs"
Expand Down
48 changes: 48 additions & 0 deletions examples/echo_poll.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! An echo example.
//!
//! Run the example and `nc 127.0.0.1 50002` in another shell.
//! All your input will be echoed out.

use monoio::{
io::{
poll_io::{AsyncReadExt, AsyncWriteExt},
IntoPollIo,
},
net::{TcpListener, TcpStream},
};

#[monoio::main(driver = "fusion")]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:50002").unwrap();
println!("listening");
loop {
let incoming = listener.accept().await;
match incoming {
Ok((stream, addr)) => {
println!("accepted a connection from {addr}");
monoio::spawn(echo(stream));
}
Err(e) => {
println!("accepted connection failed: {e}");
return;
}
}
}
}

async fn echo(stream: TcpStream) -> std::io::Result<()> {
// Convert completion-based io to poll-based io(which impl tokio::io)
let mut stream = stream.into_poll_io()?;
let mut buf: Vec<u8> = vec![0; 1024];
let mut res;
loop {
// read
res = stream.read(&mut buf).await?;
if res == 0 {
return Ok(());
}

// write all
stream.write_all(&buf[0..res]).await?;
}
}
149 changes: 41 additions & 108 deletions examples/hyper_client.rs
Original file line number Diff line number Diff line change
@@ -1,127 +1,60 @@
//! HTTP client example with hyper in compatible mode.
//! HTTP client example with hyper in poll-io mode.
//!
//! It will try to fetch http://127.0.0.1:23300/monoio and print the
//! It will try to fetch http://httpbin.org/ip and print the
//! response.
//!
//! Note:
//! It is not recommended to use this example as a production code.
//! The `hyper` require `Send` for a future and obviously the future
//! is not `Send` in monoio. So we just use some unsafe code to let
//! it pass which infact not a good solution but the only way to
//! make it work without modifying hyper.

use std::{future::Future, pin::Pin};
use std::io::Write;

use monoio_compat::TcpStreamCompat;
use bytes::Bytes;
use http_body_util::{BodyExt, Empty};
use hyper::Request;
use monoio::{io::IntoPollIo, net::TcpStream};
use monoio_compat::hyper::MonoioIo;

#[derive(Clone)]
struct HyperExecutor;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

impl<F> hyper::rt::Executor<F> for HyperExecutor
where
F: Future + 'static,
F::Output: 'static,
{
fn execute(&self, fut: F) {
monoio::spawn(fut);
}
}
async fn fetch_url(url: hyper::Uri) -> Result<()> {
let host = url.host().expect("uri has no host");
let port = url.port_u16().unwrap_or(80);
let addr = format!("{}:{}", host, port);
let stream = TcpStream::connect(addr).await?.into_poll_io()?;
let io = MonoioIo::new(stream);

#[derive(Clone)]
struct HyperConnector;
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
monoio::spawn(async move {
if let Err(err) = conn.await {
println!("Connection failed: {:?}", err);
}
});

impl tower_service::Service<hyper::Uri> for HyperConnector {
type Response = HyperConnection;
let authority = url.authority().unwrap().clone();

type Error = std::io::Error;
let path = url.path();
let req = Request::builder()
.uri(path)
.header(hyper::header::HOST, authority.as_str())
.body(Empty::<Bytes>::new())?;

#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(
&mut self,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
let mut res = sender.send_request(req).await?;

fn call(&mut self, uri: hyper::Uri) -> Self::Future {
let host = uri.host().unwrap();
let port = uri.port_u16().unwrap_or(80);
let address = format!("{host}:{port}");
println!("Response: {}", res.status());
println!("Headers: {:#?}\n", res.headers());

#[allow(clippy::type_complexity)]
let b: Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>> =
Box::pin(async move {
let conn = monoio::net::TcpStream::connect(address).await?;
let hyper_conn = HyperConnection(TcpStreamCompat::new(conn));
Ok(hyper_conn)
});
unsafe { std::mem::transmute(b) }
// Stream the body, writing each chunk to stdout as we get it
// (instead of buffering and printing at the end).
while let Some(next) = res.frame().await {
let frame = next?;
if let Some(chunk) = frame.data_ref() {
std::io::stdout().write_all(chunk)?;
}
}
}

struct HyperConnection(TcpStreamCompat);
println!("\n\nDone!");

impl tokio::io::AsyncRead for HyperConnection {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
Ok(())
}

impl tokio::io::AsyncWrite for HyperConnection {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}

impl hyper::client::connect::Connection for HyperConnection {
fn connected(&self) -> hyper::client::connect::Connected {
hyper::client::connect::Connected::new()
}
}

#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl Send for HyperConnection {}

#[monoio::main]
async fn main() {
println!("Running http client");
let connector = HyperConnector;
let client = hyper::Client::builder()
.executor(HyperExecutor)
.build::<HyperConnector, hyper::Body>(connector);
let res = client
.get("http://127.0.0.1:23300/monoio".parse().unwrap())
.await
.expect("failed to fetch");
println!("Response status: {}", res.status());
let body = hyper::body::to_bytes(res.into_body())
.await
.expect("failed to read body");
let body =
String::from_utf8(body.into_iter().collect()).expect("failed to convert body to string");
println!("Response body: {body}");
let url = "http://httpbin.org/ip".parse::<hyper::Uri>().unwrap();
fetch_url(url).await.unwrap();
}
Loading

0 comments on commit 137c173

Please sign in to comment.