From 2f4768477838904cd19e36da83ecf57f0f74e0d1 Mon Sep 17 00:00:00 2001 From: Youseok Yang Date: Sun, 4 Feb 2024 17:37:54 +0900 Subject: [PATCH] Add Submit trait and Link struct for linked operations --- Cargo.toml | 1 + examples/cat.rs | 2 +- examples/mix.rs | 2 +- examples/tcp_stream.rs | 2 +- examples/unix_listener.rs | 2 +- examples/unix_stream.rs | 2 +- examples/wrk-bench.rs | 1 + src/fs/file.rs | 7 ++- src/io/socket.rs | 2 +- src/lib.rs | 8 ++- src/net/tcp/listener.rs | 1 + src/net/tcp/stream.rs | 1 + src/net/udp.rs | 1 + src/net/unix/listener.rs | 1 + src/net/unix/stream.rs | 1 + src/runtime/driver/op/link.rs | 92 +++++++++++++++++++++++++++++++++++ src/runtime/driver/op/mod.rs | 45 +++++++++++++---- tests/driver.rs | 2 +- tests/fs_file.rs | 55 ++++++++++----------- 19 files changed, 181 insertions(+), 47 deletions(-) create mode 100644 src/runtime/driver/op/link.rs diff --git a/Cargo.toml b/Cargo.toml index 8bdec82e..5d41d356 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ io-uring = "0.6.0" socket2 = { version = "0.4.4", features = ["all"] } bytes = { version = "1.0", optional = true } futures-util = { version = "0.3.26", default-features = false, features = ["std"] } +pin-project-lite = "0.2.13" [dev-dependencies] tempfile = "3.2.0" diff --git a/examples/cat.rs b/examples/cat.rs index 33d02e38..bea33d7e 100644 --- a/examples/cat.rs +++ b/examples/cat.rs @@ -3,7 +3,7 @@ use std::{ {env, io}, }; -use tokio_uring::fs::File; +use tokio_uring::{fs::File, Submit}; fn main() { // The file to `cat` is passed as a CLI argument diff --git a/examples/mix.rs b/examples/mix.rs index aaad7f60..d2513b15 100644 --- a/examples/mix.rs +++ b/examples/mix.rs @@ -4,7 +4,7 @@ use std::env; -use tokio_uring::{fs::File, net::TcpListener}; +use tokio_uring::{fs::File, net::TcpListener, Submit}; fn main() { // The file to serve over TCP is passed as a CLI argument diff --git a/examples/tcp_stream.rs b/examples/tcp_stream.rs index 4983ee4c..736ddc18 100644 --- a/examples/tcp_stream.rs +++ b/examples/tcp_stream.rs @@ -1,6 +1,6 @@ use std::{env, net::SocketAddr}; -use tokio_uring::net::TcpStream; +use tokio_uring::{net::TcpStream, Submit}; fn main() { let args: Vec<_> = env::args().collect(); diff --git a/examples/unix_listener.rs b/examples/unix_listener.rs index 9e10496d..f4fc7e0a 100644 --- a/examples/unix_listener.rs +++ b/examples/unix_listener.rs @@ -1,6 +1,6 @@ use std::env; -use tokio_uring::net::UnixListener; +use tokio_uring::{net::UnixListener, Submit}; fn main() { let args: Vec<_> = env::args().collect(); diff --git a/examples/unix_stream.rs b/examples/unix_stream.rs index 7caf06f9..d02b8e81 100644 --- a/examples/unix_stream.rs +++ b/examples/unix_stream.rs @@ -1,6 +1,6 @@ use std::env; -use tokio_uring::net::UnixStream; +use tokio_uring::{net::UnixStream, Submit}; fn main() { let args: Vec<_> = env::args().collect(); diff --git a/examples/wrk-bench.rs b/examples/wrk-bench.rs index 222df76a..e9cb337e 100644 --- a/examples/wrk-bench.rs +++ b/examples/wrk-bench.rs @@ -1,6 +1,7 @@ use std::io; use std::rc::Rc; use tokio::task::JoinHandle; +use tokio_uring::Submit; pub const RESPONSE: &'static [u8] = b"HTTP/1.1 200 OK\nContent-Type: text/plain\nContent-Length: 12\n\nHello world!"; diff --git a/src/fs/file.rs b/src/fs/file.rs index 0582388a..8c074bf7 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -3,7 +3,7 @@ use crate::buf::{BoundedBuf, BoundedBufMut, IoBuf, IoBufMut, Slice}; use crate::fs::OpenOptions; use crate::io::SharedFd; -use crate::runtime::driver::op::Op; +use crate::runtime::driver::op::{Op, Submit}; use crate::{UnsubmittedOneshot, UnsubmittedRead, UnsubmittedWrite}; use std::fmt; use std::io; @@ -32,6 +32,7 @@ use std::path::Path; /// /// ```no_run /// use tokio_uring::fs::File; +/// use tokio_uring::Submit; /// /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { @@ -158,6 +159,7 @@ impl File { /// /// ```no_run /// use tokio_uring::fs::File; + /// use tokio_uring::Submit; /// /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { @@ -518,6 +520,7 @@ impl File { /// /// ```no_run /// use tokio_uring::fs::File; + /// use tokio_uring::Submit; /// /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { @@ -767,6 +770,7 @@ impl File { /// /// ```no_run /// use tokio_uring::fs::File; + /// use tokio_uring::Submit; /// /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { @@ -804,6 +808,7 @@ impl File { /// /// ```no_run /// use tokio_uring::fs::File; + /// use tokio_uring::Submit; /// /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { diff --git a/src/io/socket.rs b/src/io/socket.rs index 081eaf00..19acc7dc 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -1,5 +1,5 @@ use crate::io::write::UnsubmittedWrite; -use crate::runtime::driver::op::Op; +use crate::runtime::driver::op::{Op, Submit}; use crate::{ buf::fixed::FixedBuf, buf::{BoundedBuf, BoundedBufMut, IoBuf, Slice}, diff --git a/src/lib.rs b/src/lib.rs index e75b5803..13eb4a4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ //! //! ```no_run //! use tokio_uring::fs::File; +//! use tokio_uring::Submit; //! //! fn main() -> Result<(), Box> { //! tokio_uring::start(async { @@ -80,7 +81,10 @@ pub mod net; pub use io::read::*; pub use io::write::*; -pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot}; +pub use runtime::driver::op::{ + InFlightOneshot, Link, LinkedInFlightOneshot, OneshotOutputTransform, Submit, + UnsubmittedOneshot, +}; pub use runtime::spawn; pub use runtime::Runtime; @@ -106,6 +110,7 @@ use std::future::Future; /// /// ```no_run /// use tokio_uring::fs::File; +/// use tokio_uring::Submit; /// /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { @@ -245,6 +250,7 @@ impl Builder { /// /// ```no_run /// use tokio_uring::fs::File; +/// use tokio_uring::Submit; /// /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 2435c61b..d0d070a5 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -16,6 +16,7 @@ use std::{ /// ``` /// use tokio_uring::net::TcpListener; /// use tokio_uring::net::TcpStream; +/// use tokio_uring::Submit; /// /// let listener = TcpListener::bind("127.0.0.1:2345".parse().unwrap()).unwrap(); /// diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 2450dcb9..906845c6 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -20,6 +20,7 @@ use crate::{ /// /// ```no_run /// use tokio_uring::net::TcpStream; +/// use tokio_uring::Submit; /// use std::net::ToSocketAddrs; /// /// fn main() -> std::io::Result<()> { diff --git a/src/net/udp.rs b/src/net/udp.rs index cb0cef66..28932fbe 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -26,6 +26,7 @@ use std::{ /// /// ``` /// use tokio_uring::net::UdpSocket; +/// use tokio_uring::Submit; /// use std::net::SocketAddr; /// fn main() -> std::io::Result<()> { /// tokio_uring::start(async { diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs index ffabb5d2..f7757a52 100644 --- a/src/net/unix/listener.rs +++ b/src/net/unix/listener.rs @@ -12,6 +12,7 @@ use std::{io, path::Path}; /// ``` /// use tokio_uring::net::UnixListener; /// use tokio_uring::net::UnixStream; +/// use tokio_uring::Submit; /// /// let sock_file = "/tmp/tokio-uring-unix-test.sock"; /// let listener = UnixListener::bind(&sock_file).unwrap(); diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index 40e7ddc5..bc6a608f 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -20,6 +20,7 @@ use std::{ /// /// ```no_run /// use tokio_uring::net::UnixStream; +/// use tokio_uring::Submit; /// use std::net::ToSocketAddrs; /// /// fn main() -> std::io::Result<()> { diff --git a/src/runtime/driver/op/link.rs b/src/runtime/driver/op/link.rs new file mode 100644 index 00000000..1ea3fdd7 --- /dev/null +++ b/src/runtime/driver/op/link.rs @@ -0,0 +1,92 @@ +use io_uring::squeue::Flags; +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::{OneshotOutputTransform, Submit, UnsubmittedOneshot}; + +/// A Link struct to represent linked operations. +pub struct Link { + data: D, + next: N, +} + +impl Link { + /// Construct a new Link with actual data and next node (Link or UnsubmittedOneshot). + pub fn new(data: D, next: N) -> Self { + Self { data, next } + } +} + +impl Submit for Link +where + D: Submit, + N: Submit, +{ + type Output = LinkedInFlightOneshot; + + fn submit(self) -> Self::Output { + LinkedInFlightOneshot { + data: self.data.submit(), + next: Some(self.next.submit()), + } + } +} + +impl, N> Link, N> { + /// Construct a new soft Link with current Link and other UnsubmittedOneshot. + pub fn link>( + self, + other: UnsubmittedOneshot, + ) -> Link, Link>> { + Link { + data: self.data.set_flags(Flags::IO_LINK), + next: Link { + data: self.next, + next: other, + }, + } + } + + /// Construct a new hard Link with current Link and other UnsubmittedOneshot. + pub fn hard_link>( + self, + other: UnsubmittedOneshot, + ) -> Link, Link>> { + Link { + data: self.data.set_flags(Flags::IO_HARDLINK), + next: Link { + data: self.next, + next: other, + }, + } + } +} + +pin_project! { + /// An in-progress linked oneshot operations which can be polled for completion. + pub struct LinkedInFlightOneshot { + #[pin] + data: D, + next: Option, + } +} + +impl Future for LinkedInFlightOneshot +where + D: Future, +{ + type Output = (D::Output, N); // Will return actual output and next linked future. + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + let output = ready!(this.data.poll(cx)); + let next = this.next.take().unwrap(); + + Poll::Ready((output, next)) + } +} diff --git a/src/runtime/driver/op/mod.rs b/src/runtime/driver/op/mod.rs index 8d3559b0..a33c9e58 100644 --- a/src/runtime/driver/op/mod.rs +++ b/src/runtime/driver/op/mod.rs @@ -7,8 +7,10 @@ use std::task::{Context, Poll, Waker}; use io_uring::squeue::Flags; use io_uring::{cqueue, squeue}; +mod link; mod slab_list; +pub use link::{Link, LinkedInFlightOneshot}; use slab::Slab; use slab_list::{SlabListEntry, SlabListIndices}; @@ -38,29 +40,43 @@ impl> UnsubmittedOneshot { } } + /// Link two UnsubmittedOneshots. + pub fn link>( + self, + other: UnsubmittedOneshot, + ) -> Link, UnsubmittedOneshot> { + Link::new(self.set_flags(Flags::IO_LINK), other) + } + + /// Hard-link two UnsubmittedOneshots. + pub fn hard_link>( + self, + other: UnsubmittedOneshot, + ) -> Link, UnsubmittedOneshot> { + Link::new(self.set_flags(Flags::IO_HARDLINK), other) + } + /// Set the SQE's flags. - pub fn set_flags(mut self, flags: Flags) -> Self { + pub(crate) fn set_flags(mut self, flags: Flags) -> Self { self.sqe = self.sqe.flags(flags); self } +} + +impl> Submit for UnsubmittedOneshot { + type Output = InFlightOneshot; /// Submit an operation to the driver for batched entry to the kernel. - pub fn submit(self) -> InFlightOneshot { + fn submit(self) -> Self::Output { let handle = CONTEXT .with(|x| x.handle()) .expect("Could not submit op; not in runtime context"); - self.submit_with_driver(&handle) - } - - fn submit_with_driver(self, driver: &driver::Handle) -> InFlightOneshot { - let index = driver.submit_op_2(self.sqe); - - let driver = driver.into(); + let index = handle.submit_op_2(self.sqe); let inner = InFlightOneshotInner { index, - driver, + driver: (&handle).into(), stable_data: self.stable_data, post_op: self.post_op, }; @@ -121,6 +137,15 @@ impl> Drop for InFlightOne } } +/// Submit an operation or operations to the driver. +pub trait Submit { + /// The output of the submission with an in-flight operation or linked in-flight operations. + type Output; + + /// Submit an operation or linked operations. + fn submit(self) -> Self::Output; +} + /// Transforms the output of a oneshot operation into a more user-friendly format. pub trait OneshotOutputTransform { /// The final output after the transformation. diff --git a/tests/driver.rs b/tests/driver.rs index a123aa27..a0040ba7 100644 --- a/tests/driver.rs +++ b/tests/driver.rs @@ -1,6 +1,6 @@ use tempfile::NamedTempFile; -use tokio_uring::{buf::IoBuf, fs::File}; +use tokio_uring::{buf::IoBuf, fs::File, Submit}; #[path = "../src/future.rs"] #[allow(warnings)] diff --git a/tests/fs_file.rs b/tests/fs_file.rs index ab3172f2..569e8b15 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -7,9 +7,9 @@ use libc; use tempfile::NamedTempFile; -use tokio_uring::buf::fixed::FixedBufRegistry; use tokio_uring::buf::{BoundedBuf, BoundedBufMut}; use tokio_uring::fs::File; +use tokio_uring::{buf::fixed::FixedBufRegistry, Submit}; #[path = "../src/future.rs"] #[allow(warnings)] @@ -316,51 +316,50 @@ fn basic_fallocate() { } #[test] -fn write_linked() { +fn read_linked() { tokio_uring::start(async { - let tempfile = tempfile(); - let file = File::create(tempfile.path()).await.unwrap(); + let mut tempfile = tempfile(); + let file = File::open(tempfile.path()).await.unwrap(); - let write1 = file - .write_at(HELLO, 0) - .set_flags(io_uring::squeue::Flags::IO_LINK) - .submit(); - let write2 = file.write_at(HELLO, HELLO.len() as u64).submit(); + tempfile.write_all(&[HELLO, HELLO].concat()).unwrap(); + + let buf1 = Vec::with_capacity(HELLO.len()); + let buf2 = Vec::with_capacity(HELLO.len()); + + let read1 = file.read_at(buf1, 0); + let read2 = file.read_at(buf2, HELLO.len() as u64); + + let future1 = read1.link(read2).submit(); + + let (res1, future2) = future1.await; + let res2 = future2.await; - let res1 = write1.await; - let res2 = write2.await; res1.0.unwrap(); res2.0.unwrap(); - let file = std::fs::read(tempfile.path()).unwrap(); - assert_eq!(file, [HELLO, HELLO].concat()); + assert_eq!([HELLO, HELLO].concat(), [res1.1, res2.1].concat()); }); } #[test] -fn read_linked() { +fn write_linked() { tokio_uring::start(async { - let mut tempfile = tempfile(); - let file = File::open(tempfile.path()).await.unwrap(); - - tempfile.write_all(&[HELLO, HELLO].concat()).unwrap(); + let tempfile = tempfile(); + let file = File::create(tempfile.path()).await.unwrap(); - let buf1 = Vec::with_capacity(HELLO.len()); - let buf2 = Vec::with_capacity(HELLO.len()); + let write1 = file.write_at(HELLO, 0); + let write2 = file.write_at(HELLO, HELLO.len() as u64); - let read1 = file - .read_at(buf1, 0) - .set_flags(io_uring::squeue::Flags::IO_LINK) - .submit(); - let read2 = file.read_at(buf2, HELLO.len() as u64).submit(); + let future1 = write1.link(write2).submit(); - let res1 = read1.await; - let res2 = read2.await; + let (res1, future2) = future1.await; + let res2 = future2.await; res1.0.unwrap(); res2.0.unwrap(); - assert_eq!([HELLO, HELLO].concat(), [res1.1, res2.1].concat()); + let file = std::fs::read(tempfile.path()).unwrap(); + assert_eq!(file, [HELLO, HELLO].concat()); }); }