diff --git a/src/lib.rs b/src/lib.rs index 2c4c148..34fc8fb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,7 +70,6 @@ use std::sync::Arc; use std::task::{Context, Poll, Waker}; use std::time::{Duration, Instant}; -use std::ops::Deref; #[cfg(all(not(async_io_no_io_safety), unix))] use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; #[cfg(unix)] @@ -90,7 +89,7 @@ use futures_lite::stream::{self, Stream}; use futures_lite::{future, pin, ready}; use socket2::{Domain, Protocol, SockAddr, Socket, Type}; -use crate::reactor::{Reactor, Source}; +use crate::reactor::{Reactor, Source, SourceContainer}; mod driver; mod reactor; @@ -617,7 +616,7 @@ impl Stream for Timer { #[derive(Debug)] pub struct Async { /// A source registered in the reactor. - source: ArcSource, + source: SourceContainer, /// The inner I/O handle. io: T, @@ -679,7 +678,7 @@ impl Async { } Ok(Async { - source: ArcSource(Reactor::get().insert_io(raw)?), + source: SourceContainer(Reactor::get().insert_io(raw)?), io, }) } @@ -755,7 +754,7 @@ impl Async { rustix::io::ioctl_fionbio(borrowed, true)?; Ok(Async { - source: ArcSource(Reactor::get().insert_io(sock)?), + source: SourceContainer(Reactor::get().insert_io(sock)?), io, }) } @@ -1953,21 +1952,3 @@ fn connect(addr: SockAddr, domain: Domain, protocol: Option) -> io::Re } Ok(socket) } - -#[derive(Debug)] -struct ArcSource(Arc); - -impl Drop for ArcSource { - fn drop(&mut self) { - // Deregister and ignore errors because destructors should not panic. - Reactor::get().remove_io(&self.0).ok(); - } -} - -impl Deref for ArcSource { - type Target = Source; - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs index cb8519a..f6c6ff1 100644 --- a/src/os/kqueue.rs +++ b/src/os/kqueue.rs @@ -3,7 +3,7 @@ use __private::QueueableSealed; use crate::reactor::{Reactor, Readable, Registration}; -use crate::{ArcSource, Async}; +use crate::{Async, SourceContainer}; use std::convert::{TryFrom, TryInto}; use std::future::Future; @@ -60,7 +60,7 @@ impl Filter { /// ``` pub fn new(mut filter: T) -> Result { Ok(Self(Async { - source: ArcSource(Reactor::get().insert_io(filter.registration())?), + source: SourceContainer(Reactor::get().insert_io(filter.registration())?), io: filter, })) } diff --git a/src/reactor.rs b/src/reactor.rs index 3e39534..98df2a5 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -5,6 +5,7 @@ use std::future::Future; use std::io; use std::marker::PhantomData; use std::mem; +use std::ops::Deref; use std::panic; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -663,3 +664,21 @@ impl>, T> Drop for Ready { } } } + +#[derive(Debug)] +pub(crate) struct SourceContainer(pub(crate) Arc); + +impl Drop for SourceContainer { + fn drop(&mut self) { + // Deregister and ignore errors because destructors should not panic. + Reactor::get().remove_io(&self.0).ok(); + } +} + +impl Deref for SourceContainer { + type Target = Source; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +}