Skip to content
This repository has been archived by the owner on Nov 5, 2018. It is now read-only.

Commit

Permalink
Implement Guard::safepoint() (#43)
Browse files Browse the repository at this point in the history
* Implement Guard::safepoint()

* Add a comment in safepoint()

* Remove the SeqCst fence from repin()

* Rename safepoint() into repin()

* Fix a glitch in documentation
  • Loading branch information
jeehoonkang authored and Stjepan Glavina committed Nov 26, 2017
1 parent f2da300 commit 983b33c
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 23 deletions.
20 changes: 10 additions & 10 deletions src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,22 +325,22 @@ mod tests {
let collector = Collector::new();
let handle = collector.handle();

unsafe {
let guard = &handle.pin();
let mut guard = handle.pin();

let mut v = Vec::with_capacity(COUNT);
for i in 0..COUNT {
v.push(Elem(i as i32));
}
let mut v = Vec::with_capacity(COUNT);
for i in 0..COUNT {
v.push(Elem(i as i32));
}

let a = Owned::new(v).into_shared(guard);
guard.defer(move || a.into_owned());
{
let a = Owned::new(v).into_shared(&guard);
unsafe { guard.defer(move || a.into_owned()); }
guard.flush();
}

while DROPS.load(Ordering::Relaxed) < COUNT {
let guard = &handle.pin();
collector.global.collect(guard);
guard.repin();
collector.global.collect(&guard);
}
assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
}
Expand Down
37 changes: 37 additions & 0 deletions src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,43 @@ impl Guard {
}
}

/// Unpins and then immediately re-pins the thread.
///
/// This method is useful when you don't want delay the advancement of the global epoch by
/// holding an old epoch. For safety, you should not maintain any guard-based reference across
/// the call (the latter is enforced by `&mut self`). The thread will only be repinned if this
/// is the only active guard for the current thread.
///
/// If this method is called from an [`unprotected`] guard, then the call will be just no-op.
///
/// # Examples
///
/// ```
/// use crossbeam_epoch::{self as epoch, Atomic};
/// use std::sync::atomic::Ordering::SeqCst;
/// use std::thread;
/// use std::time::Duration;
///
/// let a = Atomic::new(777);
/// let mut guard = epoch::pin();
/// {
/// let p = a.load(SeqCst, &guard);
/// assert_eq!(unsafe { p.as_ref() }, Some(&777));
/// }
/// guard.repin();
/// {
/// let p = a.load(SeqCst, &guard);
/// assert_eq!(unsafe { p.as_ref() }, Some(&777));
/// }
/// ```
///
/// [`unprotected`]: fn.unprotected.html
pub fn repin(&mut self) {
if let Some(local) = unsafe { self.local.as_ref() } {
local.repin();
}
}

/// Temporarily unpins the thread, executes the given function and then re-pins the thread.
///
/// This method is useful when you need to perform a long-running operation (e.g. sleeping)
Expand Down
49 changes: 36 additions & 13 deletions src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use core::mem::{self, ManuallyDrop};
use core::num::Wrapping;
use core::ptr;
use core::sync::atomic;
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
use core::sync::atomic::Ordering;
use alloc::boxed::Box;
use alloc::arc::Arc;

Expand Down Expand Up @@ -63,10 +63,10 @@ impl Global {

/// Pushes the bag into the global queue and replaces the bag with a new empty bag.
pub fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
let epoch = self.epoch.load(Relaxed);
let epoch = self.epoch.load(Ordering::Relaxed);
let bag = mem::replace(bag, Bag::new());

atomic::fence(SeqCst);
atomic::fence(Ordering::SeqCst);

self.queue.push((epoch, bag), guard);
}
Expand Down Expand Up @@ -106,8 +106,8 @@ impl Global {
/// `try_advance()` is annotated `#[cold]` because it is rarely called.
#[cold]
pub fn try_advance(&self, guard: &Guard) -> Epoch {
let global_epoch = self.epoch.load(Relaxed);
atomic::fence(SeqCst);
let global_epoch = self.epoch.load(Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);

// TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
// easy to implement in a lock-free manner. However, traversal can be slow due to cache
Expand All @@ -121,7 +121,7 @@ impl Global {
return global_epoch;
}
Ok(local) => {
let local_epoch = local.epoch.load(Relaxed);
let local_epoch = local.epoch.load(Ordering::Relaxed);

// If the participant was pinned in a different epoch, we cannot advance the
// global epoch just yet.
Expand All @@ -131,7 +131,7 @@ impl Global {
}
}
}
atomic::fence(Acquire);
atomic::fence(Ordering::Acquire);

// All pinned participants were pinned in the current global epoch.
// Now let's advance the global epoch...
Expand All @@ -141,7 +141,7 @@ impl Global {
// called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
// advanced two steps ahead of it.
let new_epoch = global_epoch.successor();
self.epoch.store(new_epoch, Release);
self.epoch.store(new_epoch, Ordering::Release);
new_epoch
}
}
Expand Down Expand Up @@ -240,7 +240,7 @@ impl Local {
self.guard_count.set(guard_count.checked_add(1).unwrap());

if guard_count == 0 {
let global_epoch = self.global().epoch.load(Relaxed);
let global_epoch = self.global().epoch.load(Ordering::Relaxed);
let new_epoch = global_epoch.pinned();

// Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
Expand All @@ -257,11 +257,11 @@ impl Local {
// Both instructions have the effect of a full barrier, but benchmarks have shown
// that the second one makes pinning faster in this particular case.
let current = Epoch::starting();
let previous = self.epoch.compare_and_swap(current, new_epoch, SeqCst);
let previous = self.epoch.compare_and_swap(current, new_epoch, Ordering::SeqCst);
debug_assert_eq!(current, previous, "participant was expected to be unpinned");
} else {
self.epoch.store(new_epoch, Relaxed);
atomic::fence(SeqCst);
self.epoch.store(new_epoch, Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
}

// Increment the pin counter.
Expand All @@ -285,14 +285,37 @@ impl Local {
self.guard_count.set(guard_count - 1);

if guard_count == 1 {
self.epoch.store(Epoch::starting(), Release);
self.epoch.store(Epoch::starting(), Ordering::Release);

if self.handle_count.get() == 0 {
self.finalize();
}
}
}

/// Unpins and then pins the `Local`.
#[inline]
pub fn repin(&self) {
let guard_count = self.guard_count.get();

// Update the local epoch only if there's only one guard.
if guard_count == 1 {
let epoch = self.epoch.load(Ordering::Relaxed);
let global_epoch = self.global().epoch.load(Ordering::Relaxed);

// Update the local epoch only if the global epoch is greater than the local epoch.
if epoch != global_epoch {
// We store the new epoch with `Release` because we need to ensure any memory
// accesses from the previous epoch do not leak into the new one.
self.epoch.store(global_epoch, Ordering::Release);

// However, we don't need a following `SeqCst` fence, because it is safe for memory
// accesses from the new epoch to be executed before updating the local epoch. At
// worse, other threads will see the new epoch late and delay GC slightly.
}
}
}

/// Increments the handle count.
#[inline]
pub fn acquire_handle(&self) {
Expand Down

0 comments on commit 983b33c

Please sign in to comment.