From 0624bea6b71189037acc68b62f9e263ba6c89c50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?ihc=E7=AB=A5=E9=9E=8B=40=E6=8F=90=E4=B8=8D=E8=B5=B7?= =?UTF-8?q?=E5=8A=B2?= Date: Tue, 11 Jul 2023 19:37:52 +0800 Subject: [PATCH] feat: support timeout with enter args (#190) --- examples/Cargo.toml | 23 +++++++++++++++------- monoio/Cargo.toml | 6 +++++- monoio/src/driver/uring/mod.rs | 35 +++++++++++++++++++++++++++------- 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index ee9c6db5..cb5b94b0 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -9,7 +9,16 @@ version = "0.0.0" # [dependencies] instead. In additional, if you want to know how runtime # works, you can enable "debug" feature. [dev-dependencies] -monoio = {path = "../monoio", default-features = false, features = ["async-cancel", "sync", "bytes", "iouring", "legacy", "macros", "utils"]} +monoio = { path = "../monoio", default-features = false, features = [ + "async-cancel", + "sync", + "bytes", + "iouring", + "legacy", + "macros", + "utils", + "enter-args", +] } # Enable tracing and tracing-subscriber for print out runtime debug # tracing information. Add these only when you enable "debug" feature. @@ -17,17 +26,17 @@ monoio = {path = "../monoio", default-features = false, features = ["async-cance # tracing-subscriber = "0.3" # For hyper examples -hyper = {version = "0.14", features = ["http1", "client", "server", "stream"]} +hyper = { version = "0.14", features = ["http1", "client", "server", "stream"] } # For h2 examples -bytes = {version = "1"} -h2 = {version = "0.3"} -http = {version = "0.2"} +bytes = { version = "1" } +h2 = { version = "0.3" } +http = { version = "0.2" } # For hyper and h2 examples -monoio-compat = {path = "../monoio-compat"} +monoio-compat = { path = "../monoio-compat" } -tokio = {version = "1", default-features = false, features = ["io-util"]} +tokio = { version = "1", default-features = false, features = ["io-util"] } tower-service = "0.3" futures = "0.3" diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index 8e6f9a12..04262d22 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -57,7 +57,11 @@ async-cancel = [] # enanle zero copy(enable SOCK_ZEROCOPY + MSG_ZEROCOPY flag) # WARNING: this feature may cause performance degradation zero-copy = [] -# splice op(require kernel 5.7+) +# will use enter+args to park with timeout on uring driver +# better performance but requires 5.11+ +# has no effect on legacy driver +enter-args = [] +# splice op(requires kernel 5.7+) splice = [] # enable `async main` macros support macros = ["monoio-macros"] diff --git a/monoio/src/driver/uring/mod.rs b/monoio/src/driver/uring/mod.rs index a9fba7d6..f93e13e3 100644 --- a/monoio/src/driver/uring/mod.rs +++ b/monoio/src/driver/uring/mod.rs @@ -10,7 +10,7 @@ use std::{ time::Duration, }; -use io_uring::{cqueue, opcode, types::Timespec, IoUring}; +use io_uring::{cqueue, types::Timespec, IoUring}; use lifecycle::Lifecycle; use super::{ @@ -28,6 +28,7 @@ pub(crate) use waker::UnparkHandle; #[allow(unused)] pub(crate) const CANCEL_USERDATA: u64 = u64::MAX; +#[cfg(not(feature = "enter-args"))] pub(crate) const TIMEOUT_USERDATA: u64 = u64::MAX - 1; #[allow(unused)] pub(crate) const EVENTFD_USERDATA: u64 = u64::MAX - 2; @@ -160,7 +161,7 @@ impl IoUringDriver { #[cfg(feature = "sync")] fn install_eventfd(&self, inner: &mut UringInner, fd: RawFd) { - let entry = opcode::Read::new(io_uring::types::Fd(fd), self.eventfd_read_dst, 8) + let entry = io_uring::opcode::Read::new(io_uring::types::Fd(fd), self.eventfd_read_dst, 8) .build() .user_data(EVENTFD_USERDATA); @@ -169,12 +170,13 @@ impl IoUringDriver { inner.eventfd_installed = true; } + #[cfg(not(feature = "enter-args"))] fn install_timeout(&self, inner: &mut UringInner, duration: Duration) { let timespec = timespec(duration); unsafe { std::ptr::replace(self.timespec, timespec); } - let entry = opcode::Timeout::new(self.timespec as *const Timespec) + let entry = io_uring::opcode::Timeout::new(self.timespec as *const Timespec) .build() .user_data(TIMEOUT_USERDATA); @@ -233,11 +235,30 @@ impl IoUringDriver { self.install_eventfd(inner, inner.shared_waker.as_raw_fd()); } if let Some(duration) = timeout { - self.install_timeout(inner, duration); - } + // Submit and Wait with timeout in an TimeoutOp way. + // Better compatibility(5.4+). + #[cfg(not(feature = "enter-args"))] + { + self.install_timeout(inner, duration); + inner.uring.submit_and_wait(1)?; + } - // Submit and Wait - inner.uring.submit_and_wait(1)?; + // Submit and Wait with enter args. + // Better performance(5.11+). + #[cfg(feature = "enter-args")] + { + let timespec = timespec(duration); + let args = io_uring::types::SubmitArgs::new().timespec(×pec); + if let Err(e) = inner.uring.submitter().submit_with_args(1, &args) { + if e.raw_os_error() != Some(libc::ETIME) { + return Err(e); + } + } + } + } else { + // Submit and Wait without timeout + inner.uring.submit_and_wait(1)?; + } } else { // Submit only inner.uring.submit()?;