Skip to content

Commit

Permalink
implement linearizable wakeups
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed May 2, 2024
1 parent e7b5922 commit 86debba
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 91 deletions.
121 changes: 92 additions & 29 deletions crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ pub(crate) struct Channel<T> {
receivers: SyncWaker,
}

/// The state of the channel after calling `start_recv` or `start_send`.
#[derive(PartialEq, Eq)]
enum Status {
/// The channel is ready to read or write to.
Ready,
/// There is currently a send or receive in progress holding up the queue.
/// All operations must block to preserve linearizability.
InProgress,
/// The channel is empty.
Empty,
}

impl<T> Channel<T> {
/// Creates a bounded channel of capacity `cap`.
pub(crate) fn with_capacity(cap: usize) -> Self {
Expand Down Expand Up @@ -135,7 +147,7 @@ impl<T> Channel<T> {
}

/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
fn start_send(&self, token: &mut Token) -> Status {
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);

Expand All @@ -144,7 +156,7 @@ impl<T> Channel<T> {
if tail & self.mark_bit != 0 {
token.array.slot = ptr::null();
token.array.stamp = 0;
return true;
return Status::Ready;
}

// Deconstruct the tail.
Expand Down Expand Up @@ -179,7 +191,7 @@ impl<T> Channel<T> {
// Prepare the token for the follow-up call to `write`.
token.array.slot = slot as *const Slot<T> as *const u8;
token.array.stamp = tail + 1;
return true;
return Status::Ready;
}
Err(t) => {
tail = t;
Expand All @@ -193,7 +205,14 @@ impl<T> Channel<T> {
// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the channel is full.
return false;
return Status::Empty;
}

// The head was advanced but the stamp hasn't been updated yet,
// meaning a receive is in-progress. Spin for a bit waiting for
// the receive to complete before falling back to parking.
if backoff.is_completed() {
return Status::InProgress;
}

backoff.spin();
Expand Down Expand Up @@ -225,7 +244,7 @@ impl<T> Channel<T> {
}

/// Attempts to reserve a slot for receiving a message.
fn start_recv(&self, token: &mut Token) -> bool {
fn start_recv(&self, token: &mut Token) -> Status {
let backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);

Expand Down Expand Up @@ -262,7 +281,7 @@ impl<T> Channel<T> {
// Prepare the token for the follow-up call to `read`.
token.array.slot = slot as *const Slot<T> as *const u8;
token.array.stamp = head.wrapping_add(self.one_lap);
return true;
return Status::Ready;
}
Err(h) => {
head = h;
Expand All @@ -280,13 +299,20 @@ impl<T> Channel<T> {
// ...then receive an error.
token.array.slot = ptr::null();
token.array.stamp = 0;
return true;
return Status::Ready;
} else {
// Otherwise, the receive operation is not ready.
return false;
return Status::Empty;
}
}

// The tail was advanced but the stamp hasn't been updated yet,
// meaning a send is in-progress. Spin for a bit waiting for
// the send to complete before falling back to parking.
if backoff.is_completed() {
return Status::InProgress;
}

backoff.spin();
head = self.head.load(Ordering::Relaxed);
} else {
Expand Down Expand Up @@ -317,11 +343,13 @@ impl<T> Channel<T> {

/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
if self.start_send(token) {
unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
} else {
Err(TrySendError::Full(msg))
match self.send_blocking(msg, None, false) {
Ok(None) => Ok(()),
Ok(Some(msg)) => Err(TrySendError::Full(msg)),
Err(SendTimeoutError::Disconnected(msg)) => Err(TrySendError::Disconnected(msg)),
Err(SendTimeoutError::Timeout(_)) => {
unreachable!("called recv_blocking with deadline: None")
}
}
}

Expand All @@ -331,14 +359,30 @@ impl<T> Channel<T> {
msg: T,
deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
self.send_blocking(msg, deadline, true)
.map(|value| assert!(value.is_none(), "called send_blocking with block: true"))
}

/// Sends a message into the channel.
pub(crate) fn send_blocking(
&self,
msg: T,
deadline: Option<Instant>,
block: bool,
) -> Result<Option<T>, SendTimeoutError<T>> {
let token = &mut Token::default();
let mut state = self.senders.start();
loop {
// Try sending a message several times.
let backoff = Backoff::new();
loop {
if self.start_send(token) {
let res = unsafe { self.write(token, msg) };
return res.map_err(SendTimeoutError::Disconnected);
match self.start_send(token) {
Status::Ready => {
let res = unsafe { self.write(token, msg) };
return res.map(|_| None).map_err(SendTimeoutError::Disconnected);
}
Status::Empty if !block => return Ok(Some(msg)),
_ => {}
}

if backoff.is_completed() {
Expand All @@ -357,7 +401,7 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a receiver wakes us up.
let oper = Operation::hook(token);
self.senders.register(oper, cx);
self.senders.register2(oper, cx, &state);

// Has the channel become ready just now?
if !self.is_full() || self.is_disconnected() {
Expand All @@ -375,30 +419,47 @@ impl<T> Channel<T> {
Selected::Operation(_) => {}
}
});

state.unpark();
}
}

/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();

if self.start_recv(token) {
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else {
Err(TryRecvError::Empty)
match self.recv_blocking(None, false) {
Ok(Some(value)) => Ok(value),
Ok(None) => Err(TryRecvError::Empty),
Err(RecvTimeoutError::Disconnected) => Err(TryRecvError::Disconnected),
Err(RecvTimeoutError::Timeout) => {
unreachable!("called recv_blocking with deadline: None")
}
}
}

/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
self.recv_blocking(deadline, true)
.map(|value| value.expect("called recv_blocking with block: true"))
}

pub(crate) fn recv_blocking(
&self,
deadline: Option<Instant>,
block: bool,
) -> Result<Option<T>, RecvTimeoutError> {
let token = &mut Token::default();
let mut state = self.receivers.start();
loop {
// Try receiving a message several times.
let backoff = Backoff::new();
loop {
if self.start_recv(token) {
let res = unsafe { self.read(token) };
return res.map_err(|_| RecvTimeoutError::Disconnected);
match self.start_recv(token) {
Status::Ready => {
let res = unsafe { self.read(token) };
return res.map(Some).map_err(|_| RecvTimeoutError::Disconnected);
}
Status::Empty if !block => return Ok(None),
_ => {}
}

if backoff.is_completed() {
Expand All @@ -417,7 +478,7 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a sender wakes us up.
let oper = Operation::hook(token);
self.receivers.register(oper, cx);
self.receivers.register2(oper, cx, &mut state);

// Has the channel become ready just now?
if !self.is_empty() || self.is_disconnected() {
Expand All @@ -437,6 +498,8 @@ impl<T> Channel<T> {
Selected::Operation(_) => {}
}
});

state.unpark();
}
}

Expand Down Expand Up @@ -568,7 +631,7 @@ pub(crate) struct Sender<'a, T>(&'a Channel<T>);

impl<T> SelectHandle for Receiver<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_recv(token)
self.0.start_recv(token) == Status::Ready
}

fn deadline(&self) -> Option<Instant> {
Expand Down Expand Up @@ -604,7 +667,7 @@ impl<T> SelectHandle for Receiver<'_, T> {

impl<T> SelectHandle for Sender<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_send(token)
self.0.start_send(token) == Status::Ready
}

fn deadline(&self) -> Option<Instant> {
Expand Down
Loading

0 comments on commit 86debba

Please sign in to comment.