Skip to content

Commit

Permalink
Initial support for compio runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Aug 26, 2024
1 parent c71678c commit 24a17dc
Show file tree
Hide file tree
Showing 27 changed files with 851 additions and 124 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/cov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ jobs:
- name: Code coverage (glommio)
run: cargo +nightly llvm-cov --no-report --all --no-default-features --features="glommio,cookie,url,compress,openssl,rustls,ws,brotli"

- name: Code coverage (compio)
run: cargo +nightly llvm-cov --no-report --all --no-default-features --features="compio,cookie,url,compress,openssl,rustls,ws,brotli"

- name: Code coverage
run: RUST_LOG=trace cargo +nightly llvm-cov --no-report --all --doctests --no-default-features --features="tokio,cookie,url,compress,openssl,rustls,ws,brotli"

Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ jobs:
timeout-minutes: 40
run: cargo test --all --all-features --no-fail-fast -- --nocapture

- name: Run compio tests
timeout-minutes: 40
run: |
cd ntex
cargo test --no-default-features --no-fail-fast --features="compio,cookie,url,compress,openssl,rustls,ws,brotli"
- name: Run async-std tests
timeout-minutes: 40
continue-on-error: true
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/osx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ jobs:
- name: Run tests
run: cargo test --all --all-features --no-fail-fast -- --nocapture

- name: Run compio tests
timeout-minutes: 40
run: |
cd ntex
cargo test --no-default-features --no-fail-fast --features="compio,cookie,url,compress,openssl,rustls,ws,brotli"
- name: Install cargo-cache
continue-on-error: true
run: |
Expand Down
7 changes: 7 additions & 0 deletions .github/workflows/windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,10 @@ jobs:
- name: Run tests
run: |
cargo test --lib --all-features --no-fail-fast -- --nocapture --skip test_panic_in_worker --skip test_connection_force_close --skip test_connection_server_close --skip test_freeze --skip test_simple --skip test_test_methods --skip test_connection_wait_queue_force_close --skip test_params --skip test_body --skip test_form --skip test_json --skip test_connection_reuse --skip test_connection_wait_queue --skip test_no_decompress --skip test_connection_reuse_h2 --skip test_h2_tcp --skip test_timer
- name: Run compio tests
timeout-minutes: 40
continue-on-error: true
run: |
cd ntex
cargo test --no-default-features --no-fail-fast --features="compio,cookie,url,compress,openssl,rustls,ws,brotli"
13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ members = [
"ntex-tls",
"ntex-macros",
"ntex-util",
"ntex-glommio",
"ntex-tokio",
"ntex-compio",
"ntex-async-std",
"ntex-glommio",
]

[patch.crates-io]
Expand All @@ -34,6 +35,14 @@ ntex-tls = { path = "ntex-tls" }
ntex-macros = { path = "ntex-macros" }
ntex-util = { path = "ntex-util" }

ntex-glommio = { path = "ntex-glommio" }
ntex-tokio = { path = "ntex-tokio" }
ntex-compio = { path = "ntex-compio" }
ntex-glommio = { path = "ntex-glommio" }
ntex-async-std = { path = "ntex-async-std" }

compio-io = { git = "https://github.com/fafhrd91/compio.git" }
compio-fs = { git = "https://github.com/fafhrd91/compio.git" }
compio-buf = { git = "https://github.com/fafhrd91/compio.git" }
compio-net = { git = "https://github.com/fafhrd91/compio.git" }
compio-driver = { git = "https://github.com/fafhrd91/compio.git" }
compio-runtime = { git = "https://github.com/fafhrd91/compio.git" }
5 changes: 5 additions & 0 deletions ntex-compio/CHANGES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Changes

## [0.1.0] - 2024-08-xx

* Initial release
23 changes: 23 additions & 0 deletions ntex-compio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "ntex-compio"
version = "0.1.0"
authors = ["ntex contributors <[email protected]>"]
description = "compio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://ntex.rs"
repository = "https://github.com/ntex-rs/ntex.git"
documentation = "https://docs.rs/ntex-compio/"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2021"

[lib]
name = "ntex_compio"
path = "src/lib.rs"

[dependencies]
ntex-bytes = "0.1"
ntex-io = "2.3"
ntex-util = "2"
log = "0.4"
compio = { version = "0.11.0", features = ["macros", "io", "runtime"] }
1 change: 1 addition & 0 deletions ntex-compio/LICENSE-APACHE
1 change: 1 addition & 0 deletions ntex-compio/LICENSE-MIT
231 changes: 231 additions & 0 deletions ntex-compio/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
use std::{any, io};

use compio::buf::{BufResult, IoBuf, IoBufMut, SetBufInit};
use compio::io::{AsyncRead, AsyncWrite};
use compio::net::TcpStream;
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{
types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
use ntex_util::{future::select, future::Either, time::sleep};

impl IoStream for crate::TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let mut wr_io = self.0.clone();
let wr_task = compio::runtime::spawn(async move {
write_task(&mut wr_io, &write).await;
log::debug!("{} Write task is stopped", write.tag());
});

let mut rd_io = self.0.clone();
compio::runtime::spawn(async move {
read_task(&mut rd_io, &read).await;
log::debug!("{} Read task is stopped", read.tag());

if !wr_task.is_finished() {
let _ = wr_task.await;
}

let res = rd_io.close().await;
log::debug!("{} Stream is closed, {:?}", read.tag(), res);
})
.detach();

Some(Box::new(HandleWrapper(self.0)))
}
}

#[cfg(unix)]
impl IoStream for crate::UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let mut rd_io = self.0.clone();
compio::runtime::spawn(async move {
read_task(&mut rd_io, &read).await;
let _ = rd_io.close().await;
})
.detach();

let mut wr_io = self.0;
compio::runtime::spawn(async move {
write_task(&mut wr_io, &write).await;
let _ = wr_io.close().await;
})
.detach();
None
}
}

struct HandleWrapper(TcpStream);

impl Handle for HandleWrapper {
fn query(&self, id: any::TypeId) -> Option<Box<dyn any::Any>> {
if id == any::TypeId::of::<types::PeerAddr>() {
if let Ok(addr) = self.0.peer_addr() {
return Some(Box::new(types::PeerAddr(addr)));
}
}
None
}
}

struct CompioBuf(BytesVec);

unsafe impl IoBuf for CompioBuf {
#[inline]
fn as_buf_ptr(&self) -> *const u8 {
self.0.chunk().as_ptr()
}

#[inline]
fn buf_len(&self) -> usize {
self.0.len()
}

#[inline]
fn buf_capacity(&self) -> usize {
self.0.remaining_mut()
}
}

unsafe impl IoBufMut for CompioBuf {
fn as_buf_mut_ptr(&mut self) -> *mut u8 {
self.0.chunk_mut().as_mut_ptr()
}
}

impl SetBufInit for CompioBuf {
unsafe fn set_buf_init(&mut self, len: usize) {
self.0.set_len(len + self.0.len());
}
}

/// Read io task
async fn read_task<T: AsyncRead>(io: &mut T, state: &ReadContext) {
loop {
match state.ready().await {
ReadStatus::Ready => {
let result = state
.with_buf_async(|buf| async {
let BufResult(result, buf) =
match select(io.read(CompioBuf(buf)), state.wait()).await {
Either::Left(res) => res,
Either::Right(_) => return (Default::default(), Ok(1)),
};

match result {
Ok(n) => {
if n == 0 {
log::trace!(
"{}: Tcp stream is disconnected",
state.tag()
);
}
(buf.0, Ok(n))
}
Err(err) => {
log::trace!(
"{}: Read task failed on io {:?}",
state.tag(),
err
);
(buf.0, Err(err))
}
}
})
.await;

if result.is_ready() {
break;
}
}
ReadStatus::Terminate => {
log::trace!("{}: Read task is instructed to shutdown", state.tag());
break;
}
}
}
}

/// Write io task
async fn write_task<T: AsyncWrite>(mut io: T, state: &WriteContext) {
let mut delay = None;

loop {
let result = if let Some(ref mut sleep) = delay {
let result = match select(sleep, state.ready()).await {
Either::Left(_) => {
state.close(Some(io::Error::new(
io::ErrorKind::TimedOut,
"Operation timedout",
)));
return;
}
Either::Right(res) => res,
};
delay = None;
result
} else {
state.ready().await
};

match result {
WriteStatus::Ready => {
// write io stream
let result = state
.with_buf_async(|buf| async {
let mut buf = CompioBuf(buf);
loop {
let BufResult(result, buf1) = io.write(buf).await;
buf = buf1;

match result {
Ok(size) => {
if buf.0.len() == size {
return io.flush().await;
}
if size == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
));
}
buf.0.advance(size);
}
Err(e) => return Err(e),
}
}
})
.await;

match result {
Ok(()) => continue,
Err(e) => {
state.close(Some(e));
}
}
}
WriteStatus::Timeout(time) => {
log::trace!("{}: Initiate timeout delay for {:?}", state.tag(), time);
delay = Some(sleep(time));
continue;
}
WriteStatus::Shutdown(time) => {
log::trace!("{}: Write task is instructed to shutdown", state.tag());

if let Err(err) = io.flush().await {
state.close(Some(err));
} else {
match select(sleep(time), io.shutdown()).await {
Either::Left(_) => state.close(None),
Either::Right(res) => state.close(res.err()),
}
}
}
WriteStatus::Terminate => {
log::trace!("{}: Write task is instructed to terminate", state.tag());
state.close(io.shutdown().await.err());
}
}
break;
}
}
Loading

0 comments on commit 24a17dc

Please sign in to comment.