Skip to content
This repository has been archived by the owner on Nov 5, 2018. It is now read-only.

Implement Guard::safepoint() #43

Merged
5 commits merged into from
Nov 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,22 +325,22 @@ mod tests {
let collector = Collector::new();
let handle = collector.handle();

unsafe {
let guard = &handle.pin();
let mut guard = handle.pin();

let mut v = Vec::with_capacity(COUNT);
for i in 0..COUNT {
v.push(Elem(i as i32));
}
let mut v = Vec::with_capacity(COUNT);
for i in 0..COUNT {
v.push(Elem(i as i32));
}

let a = Owned::new(v).into_shared(guard);
guard.defer(move || a.into_owned());
{
let a = Owned::new(v).into_shared(&guard);
unsafe { guard.defer(move || a.into_owned()); }
guard.flush();
}

while DROPS.load(Ordering::Relaxed) < COUNT {
let guard = &handle.pin();
collector.global.collect(guard);
guard.repin();
collector.global.collect(&guard);
}
assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
}
Expand Down
37 changes: 37 additions & 0 deletions src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,43 @@ impl Guard {
}
}

/// Unpins and then immediately re-pins the thread.
///
/// This method is useful when you don't want delay the advancement of the global epoch by
/// holding an old epoch. For safety, you should not maintain any guard-based reference across
/// the call (the latter is enforced by `&mut self`). The thread will only be repinned if this
/// is the only active guard for the current thread.
///
/// If this method is called from an [`unprotected`] guard, then the call will be just no-op.
///
/// # Examples
///
/// ```
/// use crossbeam_epoch::{self as epoch, Atomic};
/// use std::sync::atomic::Ordering::SeqCst;
/// use std::thread;
/// use std::time::Duration;
///
/// let a = Atomic::new(777);
/// let mut guard = epoch::pin();
/// {
/// let p = a.load(SeqCst, &guard);
/// assert_eq!(unsafe { p.as_ref() }, Some(&777));
/// }
/// guard.repin();
/// {
/// let p = a.load(SeqCst, &guard);
/// assert_eq!(unsafe { p.as_ref() }, Some(&777));
/// }
/// ```
///
/// [`unprotected`]: fn.unprotected.html
pub fn repin(&mut self) {
if let Some(local) = unsafe { self.local.as_ref() } {
local.repin();
}
}

/// Temporarily unpins the thread, executes the given function and then re-pins the thread.
///
/// This method is useful when you need to perform a long-running operation (e.g. sleeping)
Expand Down
49 changes: 36 additions & 13 deletions src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use core::mem::{self, ManuallyDrop};
use core::num::Wrapping;
use core::ptr;
use core::sync::atomic;
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
use core::sync::atomic::Ordering;
use alloc::boxed::Box;
use alloc::arc::Arc;

Expand Down Expand Up @@ -63,10 +63,10 @@ impl Global {

/// Pushes the bag into the global queue and replaces the bag with a new empty bag.
pub fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
let epoch = self.epoch.load(Relaxed);
let epoch = self.epoch.load(Ordering::Relaxed);
let bag = mem::replace(bag, Bag::new());

atomic::fence(SeqCst);
atomic::fence(Ordering::SeqCst);

self.queue.push((epoch, bag), guard);
}
Expand Down Expand Up @@ -106,8 +106,8 @@ impl Global {
/// `try_advance()` is annotated `#[cold]` because it is rarely called.
#[cold]
pub fn try_advance(&self, guard: &Guard) -> Epoch {
let global_epoch = self.epoch.load(Relaxed);
atomic::fence(SeqCst);
let global_epoch = self.epoch.load(Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);

// TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
// easy to implement in a lock-free manner. However, traversal can be slow due to cache
Expand All @@ -121,7 +121,7 @@ impl Global {
return global_epoch;
}
Ok(local) => {
let local_epoch = local.epoch.load(Relaxed);
let local_epoch = local.epoch.load(Ordering::Relaxed);

// If the participant was pinned in a different epoch, we cannot advance the
// global epoch just yet.
Expand All @@ -131,7 +131,7 @@ impl Global {
}
}
}
atomic::fence(Acquire);
atomic::fence(Ordering::Acquire);

// All pinned participants were pinned in the current global epoch.
// Now let's advance the global epoch...
Expand All @@ -141,7 +141,7 @@ impl Global {
// called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
// advanced two steps ahead of it.
let new_epoch = global_epoch.successor();
self.epoch.store(new_epoch, Release);
self.epoch.store(new_epoch, Ordering::Release);
new_epoch
}
}
Expand Down Expand Up @@ -240,7 +240,7 @@ impl Local {
self.guard_count.set(guard_count.checked_add(1).unwrap());

if guard_count == 0 {
let global_epoch = self.global().epoch.load(Relaxed);
let global_epoch = self.global().epoch.load(Ordering::Relaxed);
let new_epoch = global_epoch.pinned();

// Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
Expand All @@ -257,11 +257,11 @@ impl Local {
// Both instructions have the effect of a full barrier, but benchmarks have shown
// that the second one makes pinning faster in this particular case.
let current = Epoch::starting();
let previous = self.epoch.compare_and_swap(current, new_epoch, SeqCst);
let previous = self.epoch.compare_and_swap(current, new_epoch, Ordering::SeqCst);
debug_assert_eq!(current, previous, "participant was expected to be unpinned");
} else {
self.epoch.store(new_epoch, Relaxed);
atomic::fence(SeqCst);
self.epoch.store(new_epoch, Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
}

// Increment the pin counter.
Expand All @@ -285,14 +285,37 @@ impl Local {
self.guard_count.set(guard_count - 1);

if guard_count == 1 {
self.epoch.store(Epoch::starting(), Release);
self.epoch.store(Epoch::starting(), Ordering::Release);

if self.handle_count.get() == 0 {
self.finalize();
}
}
}

/// Unpins and then pins the `Local`.
#[inline]
pub fn repin(&self) {
let guard_count = self.guard_count.get();

// Update the local epoch only if there's only one guard.
if guard_count == 1 {
let epoch = self.epoch.load(Ordering::Relaxed);
let global_epoch = self.global().epoch.load(Ordering::Relaxed);

// Update the local epoch only if the global epoch is greater than the local epoch.
if epoch != global_epoch {
// We store the new epoch with `Release` because we need to ensure any memory
// accesses from the previous epoch do not leak into the new one.
self.epoch.store(global_epoch, Ordering::Release);

// However, we don't need a following `SeqCst` fence, because it is safe for memory
// accesses from the new epoch to be executed before updating the local epoch. At
// worse, other threads will see the new epoch late and delay GC slightly.
}
}
}

/// Increments the handle count.
#[inline]
pub fn acquire_handle(&self) {
Expand Down