diff --git a/Cargo.toml b/Cargo.toml index 81ac4a7..701b1d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,16 +29,13 @@ concurrent-queue = "2.2.0" futures-io = { version = "0.3.28", default-features = false, features = ["std"] } futures-lite = { version = "1.11.0", default-features = false } parking = "2.0.0" -polling = "2.6.0" +polling = "3.0.0" rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"] } slab = "0.4.2" socket2 = { version = "0.5.3", features = ["all"] } tracing = { version = "0.1.37", default-features = false } waker-fn = "1.1.0" -[build-dependencies] -autocfg = "1" - [dev-dependencies] async-channel = "1" async-net = "1" @@ -54,3 +51,6 @@ timerfd = "1" [target.'cfg(windows)'.dev-dependencies] uds_windows = "1" + +[patch.crates-io] +async-io = { path = "." } diff --git a/build.rs b/build.rs deleted file mode 100644 index 8292525..0000000 --- a/build.rs +++ /dev/null @@ -1,16 +0,0 @@ -fn main() { - let cfg = match autocfg::AutoCfg::new() { - Ok(cfg) => cfg, - Err(e) => { - println!( - "cargo:warning=async-io: failed to detect compiler features: {}", - e - ); - return; - } - }; - - if !cfg.probe_rustc_version(1, 63) { - autocfg::emit("async_io_no_io_safety"); - } -} diff --git a/examples/linux-inotify.rs b/examples/linux-inotify.rs index 376aff4..121b482 100644 --- a/examples/linux-inotify.rs +++ b/examples/linux-inotify.rs @@ -37,17 +37,21 @@ fn main() -> std::io::Result<()> { future::block_on(async { // Watch events in the current directory. let mut inotify = Async::new(Inotify::init()?)?; - inotify - .get_mut() - .watches() - .add(".", WatchMask::ALL_EVENTS)?; + + // SAFETY: We do not move the inner file descriptor out. + unsafe { + inotify + .get_mut() + .watches() + .add(".", WatchMask::ALL_EVENTS)?; + } println!("Watching for filesystem events in the current directory..."); println!("Try opening a file to trigger some events."); println!(); // Wait for events in a loop and print them on the screen. loop { - for event in inotify.read_with_mut(read_op).await? { + for event in unsafe { inotify.read_with_mut(read_op).await? } { println!("{:?}", event); } } diff --git a/examples/windows-uds.rs b/examples/windows-uds.rs index 0980d1b..0f1bf04 100644 --- a/examples/windows-uds.rs +++ b/examples/windows-uds.rs @@ -8,22 +8,88 @@ #[cfg(windows)] fn main() -> std::io::Result<()> { + use std::ops::Deref; + use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket}; use std::path::PathBuf; use async_io::Async; use blocking::Unblock; - use futures_lite::{future, io, prelude::*}; + use futures_lite::{future, prelude::*}; + use std::io; use tempfile::tempdir; - use uds_windows::{UnixListener, UnixStream}; + + // n.b.: notgull: uds_windows does not support I/O safety uet, hence the wrapper types + + struct UnixListener(uds_windows::UnixListener); + + impl From for UnixListener { + fn from(ul: uds_windows::UnixListener) -> Self { + Self(ul) + } + } + + impl Deref for UnixListener { + type Target = uds_windows::UnixListener; + + fn deref(&self) -> &uds_windows::UnixListener { + &self.0 + } + } + + impl AsSocket for UnixListener { + fn as_socket(&self) -> BorrowedSocket<'_> { + unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) } + } + } + + struct UnixStream(uds_windows::UnixStream); + + impl From for UnixStream { + fn from(ul: uds_windows::UnixStream) -> Self { + Self(ul) + } + } + + impl Deref for UnixStream { + type Target = uds_windows::UnixStream; + + fn deref(&self) -> &uds_windows::UnixStream { + &self.0 + } + } + + impl AsSocket for UnixStream { + fn as_socket(&self) -> BorrowedSocket<'_> { + unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) } + } + } + + impl io::Read for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + io::Read::read(&mut self.0, buf) + } + } + + impl io::Write for UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + io::Write::write(&mut self.0, buf) + } + + fn flush(&mut self) -> io::Result<()> { + io::Write::flush(&mut self.0) + } + } + + unsafe impl async_io::IoSafe for UnixStream {} async fn client(addr: PathBuf) -> io::Result<()> { // Connect to the address. - let stream = Async::new(UnixStream::connect(addr)?)?; + let stream = Async::new(UnixStream::from(uds_windows::UnixStream::connect(addr)?))?; println!("Connected to {:?}", stream.get_ref().peer_addr()?); // Pipe the stream to stdout. let mut stdout = Unblock::new(std::io::stdout()); - io::copy(&stream, &mut stdout).await?; + futures_lite::io::copy(stream, &mut stdout).await?; Ok(()) } @@ -32,7 +98,7 @@ fn main() -> std::io::Result<()> { future::block_on(async { // Create a listener. - let listener = Async::new(UnixListener::bind(&path)?)?; + let listener = Async::new(UnixListener::from(uds_windows::UnixListener::bind(&path)?))?; println!("Listening on {:?}", listener.get_ref().local_addr()?); future::try_zip( @@ -42,7 +108,9 @@ fn main() -> std::io::Result<()> { println!("Accepted a client"); // Send a message, drop the stream, and wait for the client. - Async::new(stream)?.write_all(b"Hello!\n").await?; + Async::new(UnixStream::from(stream))? + .write_all(b"Hello!\n") + .await?; Ok(()) }, client(path), diff --git a/src/lib.rs b/src/lib.rs index 3aa368c..fd671b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,26 +70,22 @@ use std::sync::Arc; use std::task::{Context, Poll, Waker}; use std::time::{Duration, Instant}; -#[cfg(all(not(async_io_no_io_safety), unix))] -use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; #[cfg(unix)] use std::{ - os::unix::io::{AsRawFd, RawFd}, + os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}, os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream}, path::Path, }; #[cfg(windows)] -use std::os::windows::io::{AsRawSocket, RawSocket}; -#[cfg(all(not(async_io_no_io_safety), windows))] -use std::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket}; +use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket}; use futures_io::{AsyncRead, AsyncWrite}; use futures_lite::stream::{self, Stream}; use futures_lite::{future, pin, ready}; use socket2::{Domain, Protocol, SockAddr, Socket, Type}; -use crate::reactor::{Reactor, Source}; +use crate::reactor::{Reactor, Registration, Source}; mod driver; mod reactor; @@ -539,8 +535,15 @@ impl Stream for Timer { /// For higher-level primitives built on top of [`Async`], look into [`async-net`] or /// [`async-process`] (on Unix). /// +/// The most notable caveat is that it is unsafe to access the inner I/O source mutably +/// using this primitive. Traits likes [`AsyncRead`] and [`AsyncWrite`] are not implemented by +/// default unless it is guaranteed that the resource won't be invalidated by reading or writing. +/// See the [`IoSafe`] trait for more information. +/// /// [`async-net`]: https://github.com/smol-rs/async-net /// [`async-process`]: https://github.com/smol-rs/async-process +/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html +/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html /// /// ### Supported types /// @@ -625,7 +628,7 @@ pub struct Async { impl Unpin for Async {} #[cfg(unix)] -impl Async { +impl Async { /// Creates an async I/O handle. /// /// This method will put the handle in non-blocking mode and register it in @@ -651,14 +654,8 @@ impl Async { /// # std::io::Result::Ok(()) }); /// ``` pub fn new(io: T) -> io::Result> { - let raw = io.as_raw_fd(); - // Put the file descriptor in non-blocking mode. - // - // Safety: We assume `as_raw_fd()` returns a valid fd. When we can - // depend on Rust >= 1.63, where `AsFd` is stabilized, and when - // `TimerFd` implements it, we can remove this unsafe and simplify this. - let fd = unsafe { rustix::fd::BorrowedFd::borrow_raw(raw) }; + let fd = io.as_fd(); cfg_if::cfg_if! { // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux // for now, as with the standard library, because it seems to behave @@ -677,8 +674,12 @@ impl Async { } } + // SAFETY: It is impossible to drop the I/O source while it is registered through + // this type. + let registration = unsafe { Registration::new(fd) }; + Ok(Async { - source: Reactor::get().insert_io(raw)?, + source: Reactor::get().insert_io(registration)?, io: Some(io), }) } @@ -691,15 +692,15 @@ impl AsRawFd for Async { } } -#[cfg(all(not(async_io_no_io_safety), unix))] +#[cfg(unix)] impl AsFd for Async { fn as_fd(&self) -> BorrowedFd<'_> { self.get_ref().as_fd() } } -#[cfg(all(not(async_io_no_io_safety), unix))] -impl> TryFrom for Async { +#[cfg(unix)] +impl> TryFrom for Async { type Error = io::Error; fn try_from(value: OwnedFd) -> Result { @@ -707,7 +708,7 @@ impl> TryFrom for Async { } } -#[cfg(all(not(async_io_no_io_safety), unix))] +#[cfg(unix)] impl> TryFrom> for OwnedFd { type Error = io::Error; @@ -717,7 +718,7 @@ impl> TryFrom> for OwnedFd { } #[cfg(windows)] -impl Async { +impl Async { /// Creates an async I/O handle. /// /// This method will put the handle in non-blocking mode and register it in @@ -743,8 +744,7 @@ impl Async { /// # std::io::Result::Ok(()) }); /// ``` pub fn new(io: T) -> io::Result> { - let sock = io.as_raw_socket(); - let borrowed = unsafe { rustix::fd::BorrowedFd::borrow_raw(sock) }; + let borrowed = io.as_socket(); // Put the socket in non-blocking mode. // @@ -753,8 +753,14 @@ impl Async { // `TimerFd` implements it, we can remove this unsafe and simplify this. rustix::io::ioctl_fionbio(borrowed, true)?; + // Create the registration. + // + // SAFETY: It is impossible to drop the I/O source while it is registered through + // this type. + let registration = unsafe { Registration::new(borrowed) }; + Ok(Async { - source: Reactor::get().insert_io(sock)?, + source: Reactor::get().insert_io(registration)?, io: Some(io), }) } @@ -767,15 +773,15 @@ impl AsRawSocket for Async { } } -#[cfg(all(not(async_io_no_io_safety), windows))] +#[cfg(windows)] impl AsSocket for Async { fn as_socket(&self) -> BorrowedSocket<'_> { self.get_ref().as_socket() } } -#[cfg(all(not(async_io_no_io_safety), windows))] -impl> TryFrom for Async { +#[cfg(windows)] +impl> TryFrom for Async { type Error = io::Error; fn try_from(value: OwnedSocket) -> Result { @@ -783,7 +789,7 @@ impl> TryFrom for Async { } } -#[cfg(all(not(async_io_no_io_safety), windows))] +#[cfg(windows)] impl> TryFrom> for OwnedSocket { type Error = io::Error; @@ -812,6 +818,10 @@ impl Async { /// Gets a mutable reference to the inner I/O handle. /// + /// # Safety + /// + /// The underlying I/O source must not be dropped using this function. + /// /// # Examples /// /// ``` @@ -820,10 +830,10 @@ impl Async { /// /// # futures_lite::future::block_on(async { /// let mut listener = Async::::bind(([127, 0, 0, 1], 0))?; - /// let inner = listener.get_mut(); + /// let inner = unsafe { listener.get_mut() }; /// # std::io::Result::Ok(()) }); /// ``` - pub fn get_mut(&mut self) -> &mut T { + pub unsafe fn get_mut(&mut self) -> &mut T { self.io.as_mut().unwrap() } @@ -1013,6 +1023,10 @@ impl Async { /// /// The closure receives a mutable reference to the I/O handle. /// + /// # Safety + /// + /// In the closure, the underlying I/O source must not be dropped. + /// /// # Examples /// /// ```no_run @@ -1023,10 +1037,10 @@ impl Async { /// let mut listener = Async::::bind(([127, 0, 0, 1], 0))?; /// /// // Accept a new client asynchronously. - /// let (stream, addr) = listener.read_with_mut(|l| l.accept()).await?; + /// let (stream, addr) = unsafe { listener.read_with_mut(|l| l.accept()).await? }; /// # std::io::Result::Ok(()) }); /// ``` - pub async fn read_with_mut( + pub async unsafe fn read_with_mut( &mut self, op: impl FnMut(&mut T) -> io::Result, ) -> io::Result { @@ -1081,7 +1095,10 @@ impl Async { /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS /// sends a notification that the I/O handle is writable. /// - /// The closure receives a mutable reference to the I/O handle. + /// # Safety + /// + /// The closure receives a mutable reference to the I/O handle. In the closure, the underlying + /// I/O source must not be dropped. /// /// # Examples /// @@ -1094,10 +1111,10 @@ impl Async { /// socket.get_ref().connect("127.0.0.1:9000")?; /// /// let msg = b"hello"; - /// let len = socket.write_with_mut(|s| s.send(msg)).await?; + /// let len = unsafe { socket.write_with_mut(|s| s.send(msg)).await? }; /// # std::io::Result::Ok(()) }); /// ``` - pub async fn write_with_mut( + pub async unsafe fn write_with_mut( &mut self, op: impl FnMut(&mut T) -> io::Result, ) -> io::Result { @@ -1118,12 +1135,6 @@ impl AsRef for Async { } } -impl AsMut for Async { - fn as_mut(&mut self) -> &mut T { - self.get_mut() - } -} - impl Drop for Async { fn drop(&mut self) { if self.io.is_some() { @@ -1136,14 +1147,102 @@ impl Drop for Async { } } -impl AsyncRead for Async { +/// Types whose I/O trait implementations do not drop the underlying I/O source. +/// +/// The resource contained inside of the [`Async`] cannot be invalidated. This invalidation can +/// happen if the inner resource (the [`TcpStream`], [`UnixListener`] or other `T`) is moved out +/// and dropped before the [`Async`]. Because of this, functions that grant mutable access to +/// the inner type are unsafe, as there is no way to guarantee that the source won't be dropped +/// and a dangling handle won't be left behind. +/// +/// Unfortunately this extends to implementations of [`Read`] and [`Write`]. Since methods on those +/// traits take `&mut`, there is no guarantee that the implementor of those traits won't move the +/// source out while the method is being run. +/// +/// This trait is an antidote to this predicament. By implementing this trait, the user pledges +/// that using any I/O traits won't destroy the source. This way, [`Async`] can implement the +/// `async` version of these I/O traits, like [`AsyncRead`] and [`AsyncWrite`]. +/// +/// # Safety +/// +/// Any I/O trait implementations for this type must not drop the underlying I/O source. Traits +/// affected by this trait include [`Read`], [`Write`], [`Seek`] and [`BufRead`]. +/// +/// This trait is implemented by default on top of `libstd` types. In addition, it is implemented +/// for immutable reference types, as it is impossible to invalidate any outstanding references +/// while holding an immutable reference, even with interior mutability. As Rust's current pinning +/// system relies on similar guarantees, I believe that this approach is robust. +/// +/// [`BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html +/// [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html +/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html +/// +/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html +/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html +pub unsafe trait IoSafe {} + +/// Reference types can't be mutated. +/// +/// The worst thing that can happen is that external state is used to change what kind of pointer +/// `as_fd()` returns. For instance: +/// +/// ``` +/// # #[cfg(unix)] { +/// use std::cell::Cell; +/// use std::net::TcpStream; +/// use std::os::unix::io::{AsFd, BorrowedFd}; +/// +/// struct Bar { +/// flag: Cell, +/// a: TcpStream, +/// b: TcpStream +/// } +/// +/// impl AsFd for Bar { +/// fn as_fd(&self) -> BorrowedFd<'_> { +/// if self.flag.replace(!self.flag.get()) { +/// self.a.as_fd() +/// } else { +/// self.b.as_fd() +/// } +/// } +/// } +/// # } +/// ``` +/// +/// We solve this problem by only calling `as_fd()` once to get the original source. Implementations +/// like this are considered buggy (but not unsound) and are thus not really supported by `async-io`. +unsafe impl IoSafe for &T {} + +// Can be implemented on top of libstd types. +unsafe impl IoSafe for std::fs::File {} +unsafe impl IoSafe for std::io::Stderr {} +unsafe impl IoSafe for std::io::Stdin {} +unsafe impl IoSafe for std::io::Stdout {} +unsafe impl IoSafe for std::io::StderrLock<'_> {} +unsafe impl IoSafe for std::io::StdinLock<'_> {} +unsafe impl IoSafe for std::io::StdoutLock<'_> {} +unsafe impl IoSafe for std::net::TcpStream {} + +#[cfg(unix)] +unsafe impl IoSafe for std::os::unix::net::UnixStream {} + +unsafe impl IoSafe for std::io::BufReader {} +unsafe impl IoSafe for std::io::BufWriter {} +unsafe impl IoSafe for std::io::LineWriter {} +unsafe impl IoSafe for &mut T {} +unsafe impl IoSafe for Box {} +unsafe impl IoSafe for std::borrow::Cow<'_, T> {} + +impl AsyncRead for Async { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { loop { - match (*self).get_mut().read(buf) { + match unsafe { (*self).get_mut() }.read(buf) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} res => return Poll::Ready(res), } @@ -1157,7 +1256,7 @@ impl AsyncRead for Async { bufs: &mut [IoSliceMut<'_>], ) -> Poll> { loop { - match (*self).get_mut().read_vectored(bufs) { + match unsafe { (*self).get_mut() }.read_vectored(bufs) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} res => return Poll::Ready(res), } @@ -1166,6 +1265,8 @@ impl AsyncRead for Async { } } +// Since this is through a reference, we can't mutate the inner I/O source. +// Therefore this is safe! impl AsyncRead for &Async where for<'a> &'a T: Read, @@ -1199,14 +1300,14 @@ where } } -impl AsyncWrite for Async { +impl AsyncWrite for Async { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { loop { - match (*self).get_mut().write(buf) { + match unsafe { (*self).get_mut() }.write(buf) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} res => return Poll::Ready(res), } @@ -1220,7 +1321,7 @@ impl AsyncWrite for Async { bufs: &[IoSlice<'_>], ) -> Poll> { loop { - match (*self).get_mut().write_vectored(bufs) { + match unsafe { (*self).get_mut() }.write_vectored(bufs) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} res => return Poll::Ready(res), } @@ -1230,7 +1331,7 @@ impl AsyncWrite for Async { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - match (*self).get_mut().flush() { + match unsafe { (*self).get_mut() }.flush() { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} res => return Poll::Ready(res), } diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs index 78b026e..36d543b 100644 --- a/src/os/kqueue.rs +++ b/src/os/kqueue.rs @@ -8,14 +8,11 @@ use crate::Async; use std::convert::{TryFrom, TryInto}; use std::future::Future; use std::io::{Error, Result}; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; use std::pin::Pin; use std::process::Child; use std::task::{Context, Poll}; -#[cfg(not(async_io_no_io_safety))] -use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; - /// A wrapper around a queueable object that waits until it is ready. /// /// The underlying `kqueue` implementation can be used to poll for events besides file descriptor @@ -34,7 +31,7 @@ impl AsRef for Filter { impl AsMut for Filter { fn as_mut(&mut self) -> &mut T { - self.0.as_mut() + self.get_mut() } } @@ -72,15 +69,13 @@ impl AsRawFd for Filter { } } -#[cfg(not(async_io_no_io_safety))] impl AsFd for Filter { fn as_fd(&self) -> BorrowedFd<'_> { self.0.as_fd() } } -#[cfg(not(async_io_no_io_safety))] -impl> TryFrom for Filter { +impl> TryFrom for Filter { type Error = Error; fn try_from(fd: OwnedFd) -> Result { @@ -88,7 +83,6 @@ impl> TryFrom for Filter { } } -#[cfg(not(async_io_no_io_safety))] impl> TryFrom> for OwnedFd { type Error = Error; @@ -117,6 +111,9 @@ impl Filter { /// Gets a mutable reference to the underlying [`Queueable`] object. /// + /// Unlike in [`Async`], this method is safe to call, since dropping the [`Filter`] will + /// not cause any undefined behavior. + /// /// # Examples /// /// ``` @@ -129,7 +126,7 @@ impl Filter { /// # }); /// ``` pub fn get_mut(&mut self) -> &mut T { - self.0.get_mut() + unsafe { self.0.get_mut() } } /// Unwraps the inner [`Queueable`] object. diff --git a/src/reactor.rs b/src/reactor.rs index 3e39534..d6f4ad9 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -15,7 +15,7 @@ use std::time::{Duration, Instant}; use async_lock::OnceCell; use concurrent_queue::ConcurrentQueue; use futures_lite::ready; -use polling::{Event, Poller}; +use polling::{Event, Events, Poller}; use slab::Slab; // Choose the proper implementation of `Registration` based on the target platform. @@ -77,7 +77,7 @@ pub(crate) struct Reactor { /// Temporary storage for I/O events when polling the reactor. /// /// Holding a lock on this event list implies the exclusive right to poll I/O. - events: Mutex>, + events: Mutex, /// An ordered map of registered timers. /// @@ -104,7 +104,7 @@ impl Reactor { poller: Poller::new().expect("cannot initialize I/O event notification"), ticker: AtomicUsize::new(0), sources: Mutex::new(Slab::new()), - events: Mutex::new(Vec::new()), + events: Mutex::new(Events::new()), timers: Mutex::new(BTreeMap::new()), timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE), } @@ -117,13 +117,13 @@ impl Reactor { } /// Registers an I/O source in the reactor. - pub(crate) fn insert_io(&self, raw: impl Into) -> io::Result> { + pub(crate) fn insert_io(&self, raw: Registration) -> io::Result> { // Create an I/O source for this file descriptor. let source = { let mut sources = self.sources.lock().unwrap(); let key = sources.vacant_entry().key(); let source = Arc::new(Source { - registration: raw.into(), + registration: raw, key, state: Default::default(), }); @@ -268,7 +268,7 @@ impl Reactor { /// A lock on the reactor. pub(crate) struct ReactorLock<'a> { reactor: &'a Reactor, - events: MutexGuard<'a, Vec>, + events: MutexGuard<'a, Events>, } impl ReactorLock<'_> { @@ -331,14 +331,16 @@ impl ReactorLock<'_> { // e.g. we were previously interested in both readability and writability, // but only one of them was emitted. if !state[READ].is_empty() || !state[WRITE].is_empty() { - source.registration.modify( - &self.reactor.poller, - Event { - key: source.key, - readable: !state[READ].is_empty(), - writable: !state[WRITE].is_empty(), - }, - )?; + // Create the event that we are interested in. + let event = { + let mut event = Event::none(source.key); + event.readable = !state[READ].is_empty(); + event.writable = !state[WRITE].is_empty(); + event + }; + + // Register interest in this event. + source.registration.modify(&self.reactor.poller, event)?; } } } @@ -463,14 +465,16 @@ impl Source { // Update interest in this I/O handle. if was_empty { - self.registration.modify( - &Reactor::get().poller, - Event { - key: self.key, - readable: !state[READ].is_empty(), - writable: !state[WRITE].is_empty(), - }, - )?; + // Create the event that we are interested in. + let event = { + let mut event = Event::none(self.key); + event.readable = !state[READ].is_empty(); + event.writable = !state[WRITE].is_empty(); + event + }; + + // Register interest in it. + self.registration.modify(&Reactor::get().poller, event)?; } Poll::Pending @@ -637,14 +641,20 @@ impl> + Clone, T> Future for Ready { // Update interest in this I/O handle. if was_empty { - handle.borrow().source.registration.modify( - &Reactor::get().poller, - Event { - key: handle.borrow().source.key, - readable: !state[READ].is_empty(), - writable: !state[WRITE].is_empty(), - }, - )?; + // Create the event that we are interested in. + let event = { + let mut event = Event::none(handle.borrow().source.key); + event.readable = !state[READ].is_empty(); + event.writable = !state[WRITE].is_empty(); + event + }; + + // Indicate that we are interested in this event. + handle + .borrow() + .source + .registration + .modify(&Reactor::get().poller, event)?; } Poll::Pending diff --git a/src/reactor/kqueue.rs b/src/reactor/kqueue.rs index 4c575a2..9f31af9 100644 --- a/src/reactor/kqueue.rs +++ b/src/reactor/kqueue.rs @@ -7,7 +7,7 @@ use polling::{Event, PollMode, Poller}; use std::fmt; use std::io::Result; -use std::os::unix::io::RawFd; +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; use std::process::Child; /// The raw registration into the reactor. @@ -16,6 +16,12 @@ use std::process::Child; #[doc(hidden)] pub enum Registration { /// Raw file descriptor for readability/writability. + /// + /// + /// # Invariant + /// + /// This describes a valid file descriptor that has not been `close`d. It will not be + /// closed while this object is alive. Fd(RawFd), /// Raw signal number for signal delivery. @@ -35,24 +41,29 @@ impl fmt::Debug for Registration { } } -impl From for Registration { - #[inline] - fn from(raw: RawFd) -> Self { - Self::Fd(raw) +impl Registration { + /// Add this file descriptor into the reactor. + /// + /// # Safety + /// + /// The provided file descriptor must be valid and not be closed while this object is alive. + pub(crate) unsafe fn new(f: impl AsFd) -> Self { + Self::Fd(f.as_fd().as_raw_fd()) } -} -impl Registration { /// Registers the object into the reactor. #[inline] pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { match self { - Self::Fd(raw) => poller.add(*raw, Event::none(token)), + Self::Fd(raw) => { + // SAFETY: This object's existence validates the invariants of Poller::add + unsafe { poller.add(*raw, Event::none(token)) } + } Self::Signal(signal) => { poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot) } Self::Process(process) => poller.add_filter( - Process::new(process, ProcessOps::Exit), + unsafe { Process::new(process, ProcessOps::Exit) }, token, PollMode::Oneshot, ), @@ -63,12 +74,16 @@ impl Registration { #[inline] pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { match self { - Self::Fd(raw) => poller.modify(*raw, interest), + Self::Fd(raw) => { + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedFd::borrow_raw(*raw) }; + poller.modify(fd, interest) + } Self::Signal(signal) => { poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot) } Self::Process(process) => poller.modify_filter( - Process::new(process, ProcessOps::Exit), + unsafe { Process::new(process, ProcessOps::Exit) }, interest.key, PollMode::Oneshot, ), @@ -79,9 +94,15 @@ impl Registration { #[inline] pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { match self { - Self::Fd(raw) => poller.delete(*raw), + Self::Fd(raw) => { + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedFd::borrow_raw(*raw) }; + poller.delete(fd) + } Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)), - Self::Process(process) => poller.delete_filter(Process::new(process, ProcessOps::Exit)), + Self::Process(process) => { + poller.delete_filter(unsafe { Process::new(process, ProcessOps::Exit) }) + } } } } diff --git a/src/reactor/unix.rs b/src/reactor/unix.rs index 2db2437..b2f9b1b 100644 --- a/src/reactor/unix.rs +++ b/src/reactor/unix.rs @@ -4,12 +4,17 @@ use polling::{Event, Poller}; use std::fmt; use std::io::Result; -use std::os::unix::io::RawFd; +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; /// The raw registration into the reactor. #[doc(hidden)] pub struct Registration { /// Raw file descriptor on Unix. + /// + /// # Invariant + /// + /// This describes a valid file descriptor that has not been `close`d. It will not be + /// closed while this object is alive. raw: RawFd, } @@ -19,29 +24,38 @@ impl fmt::Debug for Registration { } } -impl From for Registration { - #[inline] - fn from(raw: RawFd) -> Self { - Self { raw } +impl Registration { + /// Add this file descriptor into the reactor. + /// + /// # Safety + /// + /// The provided file descriptor must be valid and not be closed while this object is alive. + pub(crate) unsafe fn new(f: impl AsFd) -> Self { + Self { + raw: f.as_fd().as_raw_fd(), + } } -} -impl Registration { /// Registers the object into the reactor. #[inline] pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { - poller.add(self.raw, Event::none(token)) + // SAFETY: This object's existence validates the invariants of Poller::add + unsafe { poller.add(self.raw, Event::none(token)) } } /// Re-registers the object into the reactor. #[inline] pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { - poller.modify(self.raw, interest) + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedFd::borrow_raw(self.raw) }; + poller.modify(fd, interest) } /// Deregisters the object from the reactor. #[inline] pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { - poller.delete(self.raw) + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedFd::borrow_raw(self.raw) }; + poller.delete(fd) } } diff --git a/src/reactor/windows.rs b/src/reactor/windows.rs index 59f247f..1c92f00 100644 --- a/src/reactor/windows.rs +++ b/src/reactor/windows.rs @@ -3,12 +3,17 @@ use polling::{Event, Poller}; use std::fmt; use std::io::Result; -use std::os::windows::io::RawSocket; +use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, RawSocket}; /// The raw registration into the reactor. #[doc(hidden)] pub struct Registration { /// Raw socket handle on Windows. + /// + /// # Invariant + /// + /// This describes a valid socket that has not been `close`d. It will not be + /// closed while this object is alive. raw: RawSocket, } @@ -18,29 +23,38 @@ impl fmt::Debug for Registration { } } -impl From for Registration { - #[inline] - fn from(raw: RawSocket) -> Self { - Self { raw } +impl Registration { + /// Add this file descriptor into the reactor. + /// + /// # Safety + /// + /// The provided file descriptor must be valid and not be closed while this object is alive. + pub(crate) unsafe fn new(f: impl AsSocket) -> Self { + Self { + raw: f.as_socket().as_raw_socket(), + } } -} -impl Registration { /// Registers the object into the reactor. #[inline] pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { - poller.add(self.raw, Event::none(token)) + // SAFETY: This object's existence validates the invariants of Poller::add + unsafe { poller.add(self.raw, Event::none(token)) } } /// Re-registers the object into the reactor. #[inline] pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { - poller.modify(self.raw, interest) + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) }; + poller.modify(fd, interest) } /// Deregisters the object from the reactor. #[inline] pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { - poller.delete(self.raw) + // SAFETY: self.raw is a valid file descriptor + let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) }; + poller.delete(fd) } }