Skip to content

Commit

Permalink
use blocking state in select macro
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed May 2, 2024
1 parent 86debba commit e2f54ea
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 108 deletions.
64 changes: 42 additions & 22 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::err::{
};
use crate::flavors;
use crate::select::{Operation, SelectHandle, Token};
use crate::waker::BlockingState;

/// Creates a multi-producer multi-consumer channel of unbounded capacity.
///
Expand Down Expand Up @@ -1358,6 +1359,14 @@ impl<T> fmt::Debug for IntoIter<T> {
}

impl<T> SelectHandle for Sender<T> {
fn start(&self) -> Option<BlockingState<'_>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().start_ref(),
SenderFlavor::List(chan) => chan.sender().start_ref(),
SenderFlavor::Zero(chan) => chan.start(),
}
}

fn try_select(&self, token: &mut Token) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().try_select(token),
Expand All @@ -1370,11 +1379,11 @@ impl<T> SelectHandle for Sender<T> {
None
}

fn register(&self, oper: Operation, cx: &Context) -> bool {
fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
SenderFlavor::List(chan) => chan.sender().register(oper, cx),
SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
SenderFlavor::Array(chan) => chan.sender().register(oper, cx, state),
SenderFlavor::List(chan) => chan.sender().register(oper, cx, state),
SenderFlavor::Zero(chan) => chan.sender().register(oper, cx, state),
}
}

Expand Down Expand Up @@ -1402,11 +1411,11 @@ impl<T> SelectHandle for Sender<T> {
}
}

fn watch(&self, oper: Operation, cx: &Context) -> bool {
fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
SenderFlavor::Array(chan) => chan.sender().watch(oper, cx, state),
SenderFlavor::List(chan) => chan.sender().watch(oper, cx, state),
SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx, state),
}
}

Expand All @@ -1420,6 +1429,17 @@ impl<T> SelectHandle for Sender<T> {
}

impl<T> SelectHandle for Receiver<T> {
fn start(&self) -> Option<BlockingState<'_>> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().start_ref(),
ReceiverFlavor::List(chan) => chan.receiver().start_ref(),
ReceiverFlavor::Zero(chan) => chan.start(),
ReceiverFlavor::At(chan) => chan.start(),
ReceiverFlavor::Tick(chan) => chan.start(),
ReceiverFlavor::Never(chan) => chan.start(),
}
}

fn try_select(&self, token: &mut Token) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
Expand All @@ -1442,14 +1462,14 @@ impl<T> SelectHandle for Receiver<T> {
}
}

fn register(&self, oper: Operation, cx: &Context) -> bool {
fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
ReceiverFlavor::At(chan) => chan.register(oper, cx),
ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
ReceiverFlavor::Never(chan) => chan.register(oper, cx),
ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx, state),
ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx, state),
ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx, state),
ReceiverFlavor::At(chan) => chan.register(oper, cx, state),
ReceiverFlavor::Tick(chan) => chan.register(oper, cx, state),
ReceiverFlavor::Never(chan) => chan.register(oper, cx, state),
}
}

Expand Down Expand Up @@ -1486,14 +1506,14 @@ impl<T> SelectHandle for Receiver<T> {
}
}

fn watch(&self, oper: Operation, cx: &Context) -> bool {
fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
ReceiverFlavor::At(chan) => chan.watch(oper, cx),
ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx, state),
ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx, state),
ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx, state),
ReceiverFlavor::At(chan) => chan.watch(oper, cx, state),
ReceiverFlavor::Tick(chan) => chan.watch(oper, cx, state),
ReceiverFlavor::Never(chan) => chan.watch(oper, cx, state),
}
}

Expand Down
48 changes: 37 additions & 11 deletions crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crossbeam_utils::{Backoff, CachePadded};
use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::SyncWaker;
use crate::waker::{BlockingState, SyncWaker};

/// A slot in a channel.
struct Slot<T> {
Expand Down Expand Up @@ -401,7 +401,7 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a receiver wakes us up.
let oper = Operation::hook(token);
self.senders.register2(oper, cx, &state);
self.senders.register(oper, cx, &state);

// Has the channel become ready just now?
if !self.is_full() || self.is_disconnected() {
Expand Down Expand Up @@ -478,7 +478,7 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a sender wakes us up.
let oper = Operation::hook(token);
self.receivers.register2(oper, cx, &mut state);
self.receivers.register(oper, cx, &state);

// Has the channel become ready just now?
if !self.is_empty() || self.is_disconnected() {
Expand Down Expand Up @@ -629,7 +629,18 @@ pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
/// Sender handle to a channel.
pub(crate) struct Sender<'a, T>(&'a Channel<T>);

impl<'a, T> Receiver<'a, T> {
/// Same as `SelectHandle::start`, but with a more specific lifetime.
pub(crate) fn start_ref(&self) -> Option<BlockingState<'a>> {
Some(self.0.receivers.start())
}
}

impl<T> SelectHandle for Receiver<'_, T> {
fn start(&self) -> Option<BlockingState<'_>> {
self.start_ref()
}

fn try_select(&self, token: &mut Token) -> bool {
self.0.start_recv(token) == Status::Ready
}
Expand All @@ -638,8 +649,9 @@ impl<T> SelectHandle for Receiver<'_, T> {
None
}

fn register(&self, oper: Operation, cx: &Context) -> bool {
self.0.receivers.register(oper, cx);
fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
let state = state.expect("Receiver::start returns blocking state");
self.0.receivers.register(oper, cx, state);
self.is_ready()
}

Expand All @@ -655,8 +667,9 @@ impl<T> SelectHandle for Receiver<'_, T> {
!self.0.is_empty() || self.0.is_disconnected()
}

fn watch(&self, oper: Operation, cx: &Context) -> bool {
self.0.receivers.watch(oper, cx);
fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
let state = state.expect("Receiver::start returns blocking state");
self.0.receivers.watch(oper, cx, state);
self.is_ready()
}

Expand All @@ -665,7 +678,18 @@ impl<T> SelectHandle for Receiver<'_, T> {
}
}

impl<'a, T> Sender<'a, T> {
/// Same as `SelectHandle::start`, but with a more specific lifetime.
pub(crate) fn start_ref(&self) -> Option<BlockingState<'a>> {
Some(self.0.senders.start())
}
}

impl<T> SelectHandle for Sender<'_, T> {
fn start(&self) -> Option<BlockingState<'_>> {
self.start_ref()
}

fn try_select(&self, token: &mut Token) -> bool {
self.0.start_send(token) == Status::Ready
}
Expand All @@ -674,8 +698,9 @@ impl<T> SelectHandle for Sender<'_, T> {
None
}

fn register(&self, oper: Operation, cx: &Context) -> bool {
self.0.senders.register(oper, cx);
fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
let state = state.expect("Sender::start returns blocking state");
self.0.senders.register(oper, cx, state);
self.is_ready()
}

Expand All @@ -691,8 +716,9 @@ impl<T> SelectHandle for Sender<'_, T> {
!self.0.is_full() || self.0.is_disconnected()
}

fn watch(&self, oper: Operation, cx: &Context) -> bool {
self.0.senders.watch(oper, cx);
fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
let state = state.expect("Sender::start returns blocking state");
self.0.senders.watch(oper, cx, state);
self.is_ready()
}

Expand Down
14 changes: 12 additions & 2 deletions crossbeam-channel/src/flavors/at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
use crate::select::{Operation, SelectHandle, Token};
use crate::utils;
use crate::waker::BlockingState;

/// Result of a receive operation.
pub(crate) type AtToken = Option<Instant>;
Expand Down Expand Up @@ -140,6 +141,10 @@ impl Channel {
}

impl SelectHandle for Channel {
fn start(&self) -> Option<BlockingState<'_>> {
None
}

#[inline]
fn try_select(&self, token: &mut Token) -> bool {
match self.try_recv() {
Expand All @@ -166,7 +171,12 @@ impl SelectHandle for Channel {
}

#[inline]
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
fn register(
&self,
_oper: Operation,
_cx: &Context,
_state: Option<&BlockingState<'_>>,
) -> bool {
self.is_ready()
}

Expand All @@ -184,7 +194,7 @@ impl SelectHandle for Channel {
}

#[inline]
fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
fn watch(&self, _oper: Operation, _cx: &Context, _state: Option<&BlockingState<'_>>) -> bool {
self.is_ready()
}

Expand Down
47 changes: 38 additions & 9 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crossbeam_utils::{Backoff, CachePadded};
use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::SyncWaker;
use crate::waker::{BlockingState, SyncWaker};

// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
// following changes by @kleimkuhler:
Expand Down Expand Up @@ -510,7 +510,7 @@ impl<T> Channel<T> {
// Prepare for blocking until a sender wakes us up.
Context::with(|cx| {
let oper = Operation::hook(token);
self.receivers.register2(oper, cx, &state);
self.receivers.register(oper, cx, &state);

// Has the channel become ready just now?
if !self.is_empty() || self.is_disconnected() {
Expand Down Expand Up @@ -734,7 +734,18 @@ pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
/// Sender handle to a channel.
pub(crate) struct Sender<'a, T>(&'a Channel<T>);

impl<'a, T> Receiver<'a, T> {
/// Same as `SelectHandle::start`, but with a more specific lifetime.
pub(crate) fn start_ref(&self) -> Option<BlockingState<'a>> {
Some(self.0.receivers.start())
}
}

impl<T> SelectHandle for Receiver<'_, T> {
fn start(&self) -> Option<BlockingState<'_>> {
self.start_ref()
}

fn try_select(&self, token: &mut Token) -> bool {
self.0.start_recv(token) == Status::Ready
}
Expand All @@ -743,8 +754,9 @@ impl<T> SelectHandle for Receiver<'_, T> {
None
}

fn register(&self, oper: Operation, cx: &Context) -> bool {
self.0.receivers.register(oper, cx);
fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
let state = state.expect("Receiver::start returns blocking state");
self.0.receivers.register(oper, cx, state);
self.is_ready()
}

Expand All @@ -760,8 +772,9 @@ impl<T> SelectHandle for Receiver<'_, T> {
!self.0.is_empty() || self.0.is_disconnected()
}

fn watch(&self, oper: Operation, cx: &Context) -> bool {
self.0.receivers.watch(oper, cx);
fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
let state = state.expect("Receiver::start returns blocking state");
self.0.receivers.watch(oper, cx, state);
self.is_ready()
}

Expand All @@ -770,7 +783,18 @@ impl<T> SelectHandle for Receiver<'_, T> {
}
}

impl<T> SelectHandle for Sender<'_, T> {
impl<'a, T> Sender<'a, T> {
/// Same as `SelectHandle::start`, but with a more specific lifetime.
pub(crate) fn start_ref(&self) -> Option<BlockingState<'a>> {
None
}
}

impl<'a, T> SelectHandle for Sender<'a, T> {
fn start(&self) -> Option<BlockingState<'a>> {
None
}

fn try_select(&self, token: &mut Token) -> bool {
self.0.start_send(token)
}
Expand All @@ -779,7 +803,12 @@ impl<T> SelectHandle for Sender<'_, T> {
None
}

fn register(&self, _oper: Operation, _cx: &Context) -> bool {
fn register(
&self,
_oper: Operation,
_cx: &Context,
_state: Option<&BlockingState<'_>>,
) -> bool {
self.is_ready()
}

Expand All @@ -793,7 +822,7 @@ impl<T> SelectHandle for Sender<'_, T> {
true
}

fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
fn watch(&self, _oper: Operation, _cx: &Context, _state: Option<&BlockingState<'_>>) -> bool {
self.is_ready()
}

Expand Down
Loading

0 comments on commit e2f54ea

Please sign in to comment.