Skip to content

Commit

Permalink
refactor: move and rename ArcSource
Browse files Browse the repository at this point in the history
  • Loading branch information
byte-sourcerer committed Sep 9, 2023
1 parent 926d081 commit d839f3a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
27 changes: 4 additions & 23 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};

use std::ops::Deref;
#[cfg(all(not(async_io_no_io_safety), unix))]
use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
#[cfg(unix)]
Expand All @@ -90,7 +89,7 @@ use futures_lite::stream::{self, Stream};
use futures_lite::{future, pin, ready};
use socket2::{Domain, Protocol, SockAddr, Socket, Type};

use crate::reactor::{Reactor, Source};
use crate::reactor::{Reactor, Source, SourceContainer};

mod driver;
mod reactor;
Expand Down Expand Up @@ -617,7 +616,7 @@ impl Stream for Timer {
#[derive(Debug)]
pub struct Async<T> {
/// A source registered in the reactor.
source: ArcSource,
source: SourceContainer,

/// The inner I/O handle.
io: T,
Expand Down Expand Up @@ -679,7 +678,7 @@ impl<T: AsRawFd> Async<T> {
}

Ok(Async {
source: ArcSource(Reactor::get().insert_io(raw)?),
source: SourceContainer(Reactor::get().insert_io(raw)?),
io,
})
}
Expand Down Expand Up @@ -755,7 +754,7 @@ impl<T: AsRawSocket> Async<T> {
rustix::io::ioctl_fionbio(borrowed, true)?;

Ok(Async {
source: ArcSource(Reactor::get().insert_io(sock)?),
source: SourceContainer(Reactor::get().insert_io(sock)?),
io,
})
}
Expand Down Expand Up @@ -1953,21 +1952,3 @@ fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Re
}
Ok(socket)
}

#[derive(Debug)]
struct ArcSource(Arc<Source>);

impl Drop for ArcSource {
fn drop(&mut self) {
// Deregister and ignore errors because destructors should not panic.
Reactor::get().remove_io(&self.0).ok();
}
}

impl Deref for ArcSource {
type Target = Source;

fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
4 changes: 2 additions & 2 deletions src/os/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use __private::QueueableSealed;

use crate::reactor::{Reactor, Readable, Registration};
use crate::{ArcSource, Async};
use crate::{Async, SourceContainer};

use std::convert::{TryFrom, TryInto};
use std::future::Future;
Expand Down Expand Up @@ -60,7 +60,7 @@ impl<T: Queueable> Filter<T> {
/// ```
pub fn new(mut filter: T) -> Result<Self> {
Ok(Self(Async {
source: ArcSource(Reactor::get().insert_io(filter.registration())?),
source: SourceContainer(Reactor::get().insert_io(filter.registration())?),
io: filter,
}))
}
Expand Down
19 changes: 19 additions & 0 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::ops::Deref;
use std::panic;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -663,3 +664,21 @@ impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
}
}
}

#[derive(Debug)]
pub(crate) struct SourceContainer(pub(crate) Arc<Source>);

impl Drop for SourceContainer {
fn drop(&mut self) {
// Deregister and ignore errors because destructors should not panic.
Reactor::get().remove_io(&self.0).ok();
}
}

impl Deref for SourceContainer {
type Target = Source;

fn deref(&self) -> &Self::Target {
self.0.deref()
}
}

0 comments on commit d839f3a

Please sign in to comment.