Skip to content

Commit

Permalink
Replace AsyncWaitQueue with lockfree immutable deque.
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenCleary committed Dec 28, 2023
1 parent 0361015 commit a27d7a7
Show file tree
Hide file tree
Showing 12 changed files with 1,190 additions and 677 deletions.
130 changes: 57 additions & 73 deletions src/Nito.AsyncEx.Coordination/AsyncAutoResetEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx.Internals;
using Nito.AsyncEx.Synchronous;

// Original idea by Stephen Toub: http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266923.aspx
Expand All @@ -11,96 +12,50 @@ namespace Nito.AsyncEx
/// <summary>
/// An async-compatible auto-reset event.
/// </summary>
[DebuggerDisplay("Id = {Id}, IsSet = {_set}")]
[DebuggerDisplay("Id = {Id}, IsSet = {_state.IsSet}")]
[DebuggerTypeProxy(typeof(DebugView))]
public sealed class AsyncAutoResetEvent
{
/// <summary>
/// The queue of TCSs that other tasks are awaiting.
/// </summary>
private readonly IAsyncWaitQueue<object> _queue;

/// <summary>
/// The current state of the event.
/// </summary>
private bool _set;

/// <summary>
/// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
/// </summary>
private int _id;

/// <summary>
/// The object used for mutual exclusion.
/// </summary>
private readonly object _mutex;

/// <summary>
/// Creates an async-compatible auto-reset event.
/// </summary>
/// <param name="set">Whether the auto-reset event is initially set or unset.</param>
/// <param name="queue">The wait queue used to manage waiters. This may be <c>null</c> to use a default (FIFO) queue.</param>
internal AsyncAutoResetEvent(bool set, IAsyncWaitQueue<object>? queue)
{
_queue = queue ?? new DefaultAsyncWaitQueue<object>();
_set = set;
_mutex = new object();
}

/// <summary>
/// Creates an async-compatible auto-reset event.
/// </summary>
/// <param name="set">Whether the auto-reset event is initially set or unset.</param>
public AsyncAutoResetEvent(bool set)
: this(set, null)
{
_state = new(set, DefaultAsyncWaitQueue<object>.Empty);
}

/// <summary>
/// Creates an async-compatible auto-reset event that is initially unset.
/// </summary>
public AsyncAutoResetEvent()
: this(false, null)
/// <summary>
/// Creates an async-compatible auto-reset event that is initially unset.
/// </summary>
public AsyncAutoResetEvent()
: this(false)
{
}

/// <summary>
/// Gets a semi-unique identifier for this asynchronous auto-reset event.
/// </summary>
public int Id
{
get { return IdManager<AsyncAutoResetEvent>.GetId(ref _id); }
}
public int Id => IdManager<AsyncAutoResetEvent>.GetId(ref _id);

/// <summary>
/// Whether this event is currently set. This member is seldom used; code using this member has a high possibility of race conditions.
/// </summary>
public bool IsSet
{
get { lock (_mutex) return _set; }
}
public bool IsSet => InterlockedState.Read(ref _state).IsSet;

/// <summary>
/// Asynchronously waits for this event to be set. If the event is set, this method will auto-reset it and return immediately, even if the cancellation token is already signalled. If the wait is canceled, then it will not auto-reset this event.
/// </summary>
/// <param name="cancellationToken">The cancellation token used to cancel this wait.</param>
public Task WaitAsync(CancellationToken cancellationToken)
{
Task ret;
lock (_mutex)
{
if (_set)
{
_set = false;
ret = TaskConstants.Completed;
}
else
{
ret = _queue.Enqueue(_mutex, cancellationToken);
}
}

return ret;
Task<object>? result = null;
InterlockedState.Transform(ref _state, s => s switch
{
{ IsSet: true } => new State(false, s.Queue),
_ => new State(false, s.Queue.Enqueue(ApplyCancel, cancellationToken, out result)),
});
return result ?? Task.CompletedTask;
}

/// <summary>
Expand Down Expand Up @@ -136,17 +91,46 @@ public void Wait()
public void Set()
#pragma warning restore CA1200 // Avoid using cref tags with a prefix
{
lock (_mutex)
{
if (_queue.IsEmpty)
_set = true;
else
_queue.Dequeue();
}
Action? completion = null;
InterlockedState.Transform(ref _state, s => s switch
{
{ Queue.IsEmpty: true } => new State(true, s.Queue),
_ => new State(false, s.Queue.Dequeue(out completion)),
});
completion?.Invoke();
}

private void ApplyCancel(Func<IAsyncWaitQueue<object>, IAsyncWaitQueue<object>> cancel) =>
InterlockedState.Transform(ref _state, s => new State(s.IsSet, cancel(s.Queue)));

/// <summary>
/// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
/// </summary>
private int _id;

private State _state;

private sealed class State
{
public State(bool isSet, IAsyncWaitQueue<object> queue)
{
IsSet = isSet;
Queue = queue;
}

/// <summary>
/// The current state of the event.
/// </summary>
public bool IsSet { get; }

/// <summary>
/// The queue of TCSs that other tasks are awaiting.
/// </summary>
public IAsyncWaitQueue<object> Queue { get; }
}

// ReSharper disable UnusedMember.Local
[DebuggerNonUserCode]
// ReSharper disable UnusedMember.Local
[DebuggerNonUserCode]
private sealed class DebugView
{
private readonly AsyncAutoResetEvent _are;
Expand All @@ -158,9 +142,9 @@ public DebugView(AsyncAutoResetEvent are)

public int Id { get { return _are.Id; } }

public bool IsSet { get { return _are._set; } }
public bool IsSet { get { return _are._state.IsSet; } }

public IAsyncWaitQueue<object> WaitQueue { get { return _are._queue; } }
public IAsyncWaitQueue<object> WaitQueue { get { return _are._state.Queue; } }
}
// ReSharper restore UnusedMember.Local
}
Expand Down
147 changes: 66 additions & 81 deletions src/Nito.AsyncEx.Coordination/AsyncConditionVariable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx.Internals;
using Nito.AsyncEx.Synchronous;

namespace Nito.AsyncEx
Expand All @@ -13,76 +14,43 @@ namespace Nito.AsyncEx
[DebuggerTypeProxy(typeof(DebugView))]
public sealed class AsyncConditionVariable
{
/// <summary>
/// The lock associated with this condition variable.
/// </summary>
private readonly AsyncLock _asyncLock;

/// <summary>
/// The queue of waiting tasks.
/// </summary>
private readonly IAsyncWaitQueue<object> _queue;

/// <summary>
/// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
/// </summary>
private int _id;

/// <summary>
/// The object used for mutual exclusion.
/// </summary>
private readonly object _mutex;

/// <summary>
/// Creates an async-compatible condition variable associated with an async-compatible lock.
/// </summary>
/// <param name="asyncLock">The lock associated with this condition variable.</param>
/// <param name="queue">The wait queue used to manage waiters. This may be <c>null</c> to use a default (FIFO) queue.</param>
internal AsyncConditionVariable(AsyncLock asyncLock, IAsyncWaitQueue<object>? queue)
{
_asyncLock = asyncLock;
_queue = queue ?? new DefaultAsyncWaitQueue<object>();
_mutex = new object();
}

/// <summary>
/// Creates an async-compatible condition variable associated with an async-compatible lock.
/// </summary>
/// <param name="asyncLock">The lock associated with this condition variable.</param>
public AsyncConditionVariable(AsyncLock asyncLock)
: this(asyncLock, null)
{
_asyncLock = asyncLock;
_queue = DefaultAsyncWaitQueue<object>.Empty;
}

/// <summary>
/// Gets a semi-unique identifier for this asynchronous condition variable.
/// </summary>
public int Id
{
get { return IdManager<AsyncConditionVariable>.GetId(ref _id); }
}
/// <summary>
/// Gets a semi-unique identifier for this asynchronous condition variable.
/// </summary>
public int Id => IdManager<AsyncConditionVariable>.GetId(ref _id);

/// <summary>
/// <summary>
/// Sends a signal to a single task waiting on this condition variable. The associated lock MUST be held when calling this method, and it will still be held when this method returns.
/// </summary>
public void Notify()
{
lock (_mutex)
{
if (!_queue.IsEmpty)
_queue.Dequeue();
}
{
Action? completion = null;
InterlockedState.Transform(ref _queue, q => q switch
{
{ IsEmpty: false } => q.Dequeue(out completion),
_ => q,
});
completion?.Invoke();
}

/// <summary>
/// Sends a signal to all tasks waiting on this condition variable. The associated lock MUST be held when calling this method, and it will still be held when this method returns.
/// </summary>
public void NotifyAll()
{
lock (_mutex)
{
_queue.DequeueAll();
}
Action? completion = null;
InterlockedState.Transform(ref _queue, q => q.DequeueAll(out completion));
completion?.Invoke();
}

/// <summary>
Expand All @@ -91,39 +59,39 @@ public void NotifyAll()
/// <param name="cancellationToken">The cancellation signal used to cancel this wait.</param>
public Task WaitAsync(CancellationToken cancellationToken)
{
Task task;
lock (_mutex)
{
// Begin waiting for either a signal or cancellation.
task = _queue.Enqueue(_mutex, cancellationToken);
Task<object> task = null!;

// Attach to the signal or cancellation.
var ret = WaitAndRetakeLockAsync(task, _asyncLock);
// Begin waiting for either a signal or cancellation.
InterlockedState.Transform(ref _queue, q => q.Enqueue(ApplyCancel, cancellationToken, out task));

// Release the lock while we are waiting.
_asyncLock.ReleaseLock();
// Attach to the signal or cancellation.
var ret = WaitAndRetakeLockAsync(task, _asyncLock);

return ret;
}
}
// Release the lock while we are waiting.
_asyncLock.ReleaseLock();

private static async Task WaitAndRetakeLockAsync(Task task, AsyncLock asyncLock)
{
try
{
await task.ConfigureAwait(false);
}
finally
return ret;

static async Task WaitAndRetakeLockAsync(Task task, AsyncLock asyncLock)
{
// Re-take the lock.
await asyncLock.LockAsync().ConfigureAwait(false);
try
{
await task.ConfigureAwait(false);
}
finally
{
// Re-take the lock.
#pragma warning disable CA2016
await asyncLock.LockAsync().ConfigureAwait(false);
#pragma warning restore CA2016
}
}
}

/// <summary>
/// Asynchronously waits for a signal on this condition variable. The associated lock MUST be held when calling this method, and it will still be held when the returned task completes.
/// </summary>
public Task WaitAsync()
/// <summary>
/// Asynchronously waits for a signal on this condition variable. The associated lock MUST be held when calling this method, and it will still be held when the returned task completes.
/// </summary>
public Task WaitAsync()
{
return WaitAsync(CancellationToken.None);
}
Expand All @@ -145,8 +113,25 @@ public void Wait()
Wait(CancellationToken.None);
}

// ReSharper disable UnusedMember.Local
[DebuggerNonUserCode]
private void ApplyCancel(Func<IAsyncWaitQueue<object>, IAsyncWaitQueue<object>> cancel) => InterlockedState.Transform(ref _queue, cancel);

/// <summary>
/// The lock associated with this condition variable.
/// </summary>
private readonly AsyncLock _asyncLock;

/// <summary>
/// The queue of waiting tasks.
/// </summary>
private IAsyncWaitQueue<object> _queue;

/// <summary>
/// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
/// </summary>
private int _id;

// ReSharper disable UnusedMember.Local
[DebuggerNonUserCode]
private sealed class DebugView
{
private readonly AsyncConditionVariable _cv;
Expand All @@ -156,11 +141,11 @@ public DebugView(AsyncConditionVariable cv)
_cv = cv;
}

public int Id { get { return _cv.Id; } }
public int Id => _cv.Id;

public AsyncLock AsyncLock { get { return _cv._asyncLock; } }
public AsyncLock AsyncLock => _cv._asyncLock;

public IAsyncWaitQueue<object> WaitQueue { get { return _cv._queue; } }
public IAsyncWaitQueue<object> WaitQueue => _cv._queue;
}
// ReSharper restore UnusedMember.Local
}
Expand Down
Loading

0 comments on commit a27d7a7

Please sign in to comment.