Skip to content

Commit

Permalink
Refactor async io support
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 10, 2024
1 parent db6d3a6 commit fc83a88
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 535 deletions.
4 changes: 2 additions & 2 deletions ntex-async-std/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-async-std"
version = "0.5.0"
version = "0.5.1"
authors = ["ntex contributors <[email protected]>"]
description = "async-std intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -17,7 +17,7 @@ path = "src/lib.rs"

[dependencies]
ntex-bytes = "0.1"
ntex-io = "2.0"
ntex-io = "2.5"
ntex-util = "2.0"
log = "0.4"
async-std = { version = "1", features = ["unstable"] }
Expand Down
153 changes: 32 additions & 121 deletions ntex-async-std/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::{any, cell::RefCell, future::Future, io, pin::Pin, task::Context, task::Poll};
use std::future::{poll_fn, Future};
use std::{any, cell::RefCell, io, pin::Pin, task::Context, task::Poll};

use async_std::io::{Read, Write};
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{
types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteStatus};
use ntex_util::{ready, time::sleep, time::Sleep};

use crate::TcpStream;

impl IoStream for TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
async_std::task::spawn_local(ReadTask::new(self.clone(), read));
let mut rio = ReadTask(RefCell::new(self.clone()));
async_std::task::spawn_local(async move {
read.handle(&mut rio).await;
});
async_std::task::spawn_local(WriteTask::new(self.clone(), write));
Some(Box::new(self))
}
Expand All @@ -29,64 +31,17 @@ impl Handle for TcpStream {
}

/// Read io task
struct ReadTask {
io: RefCell<TcpStream>,
state: ReadContext,
}

impl ReadTask {
/// Create new read io task
fn new(io: TcpStream, state: ReadContext) -> Self {
Self {
state,
io: RefCell::new(io),
}
}
}

impl Future for ReadTask {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_ref();

match ready!(this.state.poll_ready(cx)) {
ReadStatus::Ready => {
this.state.with_buf(|buf, hw, lw| {
// read data from socket
let mut io = self.io.borrow_mut();
loop {
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}

return match poll_read_buf(Pin::new(&mut io.0), cx, buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("async-std stream is disconnected");
Poll::Ready(Ok(()))
} else if buf.len() < hw {
continue;
} else {
Poll::Pending
}
}
Poll::Ready(Err(err)) => {
log::trace!("async-std read task failed on io {:?}", err);
Poll::Ready(Err(err))
}
};
}
})
}
ReadStatus::Terminate => {
log::trace!("read task is instructed to shutdown");
Poll::Ready(())
}
}
struct ReadTask(RefCell<TcpStream>);

impl ntex_io::AsyncRead for ReadTask {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| {
let mut io = self.0.borrow_mut();
poll_read_buf(Pin::new(&mut io.0), cx, &mut buf)
})
.await;
(buf, result)
}
}

Expand Down Expand Up @@ -342,71 +297,27 @@ mod unixstream {

impl IoStream for UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
async_std::task::spawn_local(ReadTask::new(self.clone(), read));
let mut rio = ReadTask(RefCell::new(self.clone()));
async_std::task::spawn_local(async move {
read.handle(&mut rio).await;
});
async_std::task::spawn_local(WriteTask::new(self, write));
None
}
}

/// Read io task
struct ReadTask {
io: RefCell<UnixStream>,
state: ReadContext,
}

impl ReadTask {
/// Create new read io task
fn new(io: UnixStream, state: ReadContext) -> Self {
Self {
state,
io: RefCell::new(io),
}
}
}

impl Future for ReadTask {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_ref();

this.state.with_buf(|buf, hw, lw| {
match ready!(this.state.poll_ready(cx)) {
ReadStatus::Ready => {
// read data from socket
let mut io = this.io.borrow_mut();
loop {
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}

return match poll_read_buf(Pin::new(&mut io.0), cx, buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("async-std stream is disconnected");
Poll::Ready(Ok(()))
} else if buf.len() < hw {
continue;
} else {
Poll::Pending
}
}
Poll::Ready(Err(err)) => {
log::trace!("read task failed on io {:?}", err);
Poll::Ready(Err(err))
}
};
}
}
ReadStatus::Terminate => {
log::trace!("read task is instructed to shutdown");
Poll::Ready(Ok(()))
}
}
struct ReadTask(RefCell<UnixStream>);

impl ntex_io::AsyncRead for ReadTask {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| {
let mut io = self.0.borrow_mut();
poll_read_buf(Pin::new(&mut io.0), cx, &mut buf)
})
.await;
(buf, result)
}
}

Expand Down
4 changes: 2 additions & 2 deletions ntex-compio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-compio"
version = "0.1.1"
version = "0.1.2"
authors = ["ntex contributors <[email protected]>"]
description = "compio runtime intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -18,7 +18,7 @@ path = "src/lib.rs"

[dependencies]
ntex-bytes = "0.1"
ntex-io = "2.3"
ntex-io = "2.5"
ntex-util = "2"
log = "0.4"
compio-net = "0.4.1"
Expand Down
113 changes: 40 additions & 73 deletions ntex-compio/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,28 @@ use compio::buf::{BufResult, IoBuf, IoBufMut, SetBufInit};
use compio::io::{AsyncRead, AsyncWrite};
use compio::net::TcpStream;
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{
types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteStatus};
use ntex_util::{future::select, future::Either, time::sleep};

impl IoStream for crate::TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let mut io = self.0.clone();
let io = self.0.clone();
compio::runtime::spawn(async move {
run(&mut io, &read, write).await;
let mut wr_io = io.clone();
let wr_task = compio::runtime::spawn(async move {
write_task(&mut wr_io, &write).await;
log::debug!("{} Write task is stopped", write.tag());
});
let mut io = ReadIo(io);

read.handle(&mut io).await;
log::debug!("{} Read task is stopped", read.tag());

if !wr_task.is_finished() {
let _ = wr_task.await;
}

match io.close().await {
match io.0.close().await {
Ok(_) => log::debug!("{} Stream is closed", read.tag()),
Err(e) => log::error!("{} Stream is closed, {:?}", read.tag(), e),
}
Expand All @@ -29,11 +39,24 @@ impl IoStream for crate::TcpStream {
#[cfg(unix)]
impl IoStream for crate::UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let mut io = self.0;
let io = self.0;
compio::runtime::spawn(async move {
run(&mut io, &read, write).await;
let mut wr_io = io.clone();
let wr_task = compio::runtime::spawn(async move {
write_task(&mut wr_io, &write).await;
log::debug!("{} Write task is stopped", write.tag());
});

let mut io = ReadIo(io);

match io.close().await {
read.handle(&mut io).await;
log::debug!("{} Read task is stopped", read.tag());

if !wr_task.is_finished() {
let _ = wr_task.await;
}

match io.0.close().await {
Ok(_) => log::debug!("{} Unix stream is closed", read.tag()),
Err(e) => log::error!("{} Unix stream is closed, {:?}", read.tag(), e),
}
Expand Down Expand Up @@ -88,71 +111,15 @@ impl SetBufInit for CompioBuf {
}
}

async fn run<T: AsyncRead + AsyncWrite + Clone + 'static>(
io: &mut T,
read: &ReadContext,
write: WriteContext,
) {
let mut wr_io = io.clone();
let wr_task = compio::runtime::spawn(async move {
write_task(&mut wr_io, &write).await;
log::debug!("{} Write task is stopped", write.tag());
});

read_task(io, read).await;
log::debug!("{} Read task is stopped", read.tag());

if !wr_task.is_finished() {
let _ = wr_task.await;
}
}

/// Read io task
async fn read_task<T: AsyncRead>(io: &mut T, state: &ReadContext) {
loop {
match state.ready().await {
ReadStatus::Ready => {
let result = state
.with_buf_async(|buf| async {
let BufResult(result, buf) =
match select(io.read(CompioBuf(buf)), state.wait_for_close())
.await
{
Either::Left(res) => res,
Either::Right(_) => return (Default::default(), Ok(1)),
};

match result {
Ok(n) => {
if n == 0 {
log::trace!(
"{}: Tcp stream is disconnected",
state.tag()
);
}
(buf.0, Ok(n))
}
Err(err) => {
log::trace!(
"{}: Read task failed on io {:?}",
state.tag(),
err
);
(buf.0, Err(err))
}
}
})
.await;
struct ReadIo<T: AsyncRead>(T);

if result.is_ready() {
break;
}
}
ReadStatus::Terminate => {
log::trace!("{}: Read task is instructed to shutdown", state.tag());
break;
}
}
impl<T> ntex_io::AsyncRead for ReadIo<T>
where
T: AsyncRead,
{
async fn read(&mut self, buf: BytesVec) -> (BytesVec, io::Result<usize>) {
let BufResult(result, buf) = self.0.read(CompioBuf(buf)).await;
(buf.0, result)
}
}

Expand Down
4 changes: 2 additions & 2 deletions ntex-glommio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-glommio"
version = "0.5.0"
version = "0.5.1"
authors = ["ntex contributors <[email protected]>"]
description = "glommio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -17,7 +17,7 @@ path = "src/lib.rs"

[dependencies]
ntex-bytes = "0.1"
ntex-io = "2.0"
ntex-io = "2.5"
ntex-util = "2.0"
futures-lite = "2.2"
log = "0.4"
Expand Down
Loading

0 comments on commit fc83a88

Please sign in to comment.