diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 04aeb866a..8ca9d7168 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -4,7 +4,7 @@ use std::boxed::Box; use std::cell::UnsafeCell; use std::marker::PhantomData; use std::mem::MaybeUninit; -use std::ptr; +use std::ptr::{self, NonNull}; use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; use std::time::Instant; @@ -56,10 +56,46 @@ impl Slot { }; /// Waits until a message is written into the slot. - fn wait_write(&self) { - let backoff = Backoff::new(); - while self.state.load(Ordering::Acquire) & WRITE == 0 { - backoff.snooze(); + fn wait_write(&self, receivers: &SyncWaker, token: &mut Token) { + let mut state = receivers.start(); + loop { + // Try reading the message several times. + let backoff = Backoff::new(); + loop { + if self.state.load(Ordering::Acquire) & WRITE != 0 { + return; + } + + if backoff.is_completed() { + break; + } else { + backoff.snooze(); + } + } + + // Prepare for blocking until a sender wakes us up. + Context::with(|cx| { + let oper = Operation::hook(token); + receivers.register(oper, cx, &state); + + // Was the emssage just sent? + if self.state.load(Ordering::Acquire) & WRITE != 0 { + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(None); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + receivers.unregister(oper).unwrap(); + } + Selected::Operation(_) => {} + } + + state.unpark(); + }); } } } @@ -85,14 +121,46 @@ impl Block { } /// Waits until the next pointer is set. - fn wait_next(&self) -> *mut Self { - let backoff = Backoff::new(); + fn wait_next(&self, receivers: &SyncWaker, token: &mut Token) -> *mut Self { + let mut state = receivers.start(); loop { - let next = self.next.load(Ordering::Acquire); - if !next.is_null() { - return next; + // Try reading the message several times. + let backoff = Backoff::new(); + loop { + if let Some(next) = NonNull::new(self.next.load(Ordering::Acquire)) { + return next.as_ptr(); + } + + if backoff.is_completed() { + break; + } else { + backoff.snooze(); + } } - backoff.snooze(); + + // Prepare for blocking until a sender wakes us up. + Context::with(|cx| { + let oper = Operation::hook(token); + receivers.register(oper, cx, &state); + + // Was the next pointer just written? + if !self.next.load(Ordering::Acquire).is_null() { + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(None); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + receivers.unregister(oper).unwrap(); + } + Selected::Operation(_) => {} + } + + state.unpark(); + }); } } @@ -382,7 +450,7 @@ impl Channel { Ok(_) => unsafe { // If we've reached the end of the block, move to the next one. if offset + 1 == BLOCK_CAP { - let next = (*block).wait_next(); + let next = (*block).wait_next(&self.receivers, token); let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); if !(*next).next.load(Ordering::Relaxed).is_null() { next_index |= MARK_BIT; @@ -416,7 +484,7 @@ impl Channel { let block = token.list.block as *mut Block; let offset = token.list.offset; let slot = unsafe { (*block).slots.get_unchecked(offset) }; - slot.wait_write(); + slot.wait_write(&self.receivers, token); let msg = unsafe { slot.msg.get().read().assume_init() }; // Destroy the block if we've reached the end, or if another thread wanted to destroy but @@ -610,6 +678,7 @@ impl Channel { /// /// This method should only be called when all receivers are dropped. fn discard_all_messages(&self) { + let token = &mut Token::default(); let backoff = Backoff::new(); let mut tail = self.tail.index.load(Ordering::Acquire); loop { @@ -651,10 +720,10 @@ impl Channel { if offset < BLOCK_CAP { // Drop the message in the slot. let slot = (*block).slots.get_unchecked(offset); - slot.wait_write(); + slot.wait_write(&self.receivers, token); (*slot.msg.get()).assume_init_drop(); } else { - (*block).wait_next(); + (*block).wait_next(&self.receivers, token); // Deallocate the block and move to the next one. let next = (*block).next.load(Ordering::Acquire); drop(Box::from_raw(block));