Skip to content

Commit

Permalink
Add executor spinning in the background
Browse files Browse the repository at this point in the history
To prevent users from having to implement
the feature themselves and provide an alternative
to the old spinning behaviour.
The new executor uses a long running `Task` instead of a `Thread`
to provide better integration with the C# ecosystem.
  • Loading branch information
Deric-W committed Jul 26, 2023
1 parent 78e8552 commit c70ec62
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/ros2cs/ros2cs_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ set(CS_SOURCES
Context.cs
GuardCondition.cs
executors/ManualExecutor.cs
executors/TaskExecutor.cs
properties/AssemblyInfo.cs
)

Expand Down
22 changes: 22 additions & 0 deletions src/ros2cs/ros2cs_core/executors/ManualExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ROS2.Executors
{
Expand Down Expand Up @@ -433,6 +434,27 @@ public void SpinWhile(Func<bool> condition, TimeSpan timeout)
}
}

/// <summary>
/// Create a task which calls <see cref="SpinWhile"/> when started.
/// </summary>
/// <remarks>
/// The resulting task prevents <see cref="TrySpin"/> and <see cref="Rescan"/> from being called
/// and this instance as well as its context from being disposed safely while it is running.
/// </remarks>
/// <param name="timeout"> Maximum time to wait for work to become available. </param>
/// <param name="cancellationToken"> Token to cancel the task. </param>
/// <returns> Task representing the spin operation. </returns>
public Task CreateSpinTask(TimeSpan timeout, CancellationToken cancellationToken)
{
return new Task(() => {
using (cancellationToken.Register(this.Interrupt))
{
this.SpinWhile(() => !cancellationToken.IsCancellationRequested, timeout);
}
cancellationToken.ThrowIfCancellationRequested();
}, cancellationToken, TaskCreationOptions.LongRunning);
}

/// <remarks>
/// This method is not thread safe and may not be called from
/// multiple threads simultaneously or while the executor is in use.
Expand Down
191 changes: 191 additions & 0 deletions src/ros2cs/ros2cs_core/executors/TaskExecutor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ROS2.Executors
{
/// <summary>
/// Executor which wraps a <see cref="ManualExecutor"/> and automatically
/// executes the task created by <see cref="ManualExecutor.CreateSpinTask"/>.
/// </summary>
/// <remarks>
/// The spin task is automatically stopped when <see cref="Dispose"/>
/// is called or the context is shut down.
/// </remarks>
public sealed class TaskExecutor : IExecutor
{
/// <summary>
/// Task managed by this executor.
/// </summary>
public Task Task { get; private set; }

private readonly CancellationTokenSource CancellationSource = new CancellationTokenSource();

private readonly ManualExecutor Executor;

private readonly Context Context;

/// <param name="context"> Context associated with this executor. </param>
/// <param name="timeout"> Maximum time to wait for work to become available. </param>
public TaskExecutor(Context context, TimeSpan timeout)
{
this.Context = context;
this.Executor = new ManualExecutor(context);
this.Task = this.Executor.CreateSpinTask(timeout, this.CancellationSource.Token);
try
{
context.OnShutdown += this.StopSpinTask;
this.Task.Start();
}
catch (SystemException)
{
try
{
context.OnShutdown -= this.StopSpinTask;
}
finally
{
this.Executor.Dispose();
}
throw;
}
}

/// <inheritdoc/>
public bool IsDisposed
{
get => this.Executor.IsDisposed;
}

/// <inheritdoc/>
public int Count
{
get => this.Executor.Count;
}

/// <inheritdoc/>
public bool IsReadOnly
{
get => this.Executor.IsReadOnly;
}

/// <inheritdoc/>
public void Add(INode node)
{
this.Executor.Add(node);
}

/// <inheritdoc/>
public void Clear()
{
this.Executor.Clear();
}

/// <inheritdoc/>
public bool Contains(INode node)
{
return this.Executor.Contains(node);
}

/// <inheritdoc/>
public void CopyTo(INode[] array, int arrayIndex)
{
this.Executor.CopyTo(array, arrayIndex);
}

/// <inheritdoc/>
public bool Remove(INode node)
{
return this.Executor.Remove(node);
}

/// <inheritdoc/>
public IEnumerator<INode> GetEnumerator()
{
return this.Executor.GetEnumerator();
}

/// <inheritdoc/>
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}

/// <inheritdoc />
public void ScheduleRescan()
{
this.Executor.ScheduleRescan();
}

/// <inheritdoc />
public bool TryScheduleRescan(INode node)
{
return this.Executor.TryScheduleRescan(node);
}

/// <inheritdoc />
public void Wait()
{
this.Executor.Wait();
}

/// <inheritdoc />
public bool TryWait(TimeSpan timeout)
{
return this.Executor.TryWait(timeout);
}

/// <summary>
/// Stop the spin task and return after it has stopped.
/// </summary>
/// <remarks>
/// This function returns immediately if the spin task
/// has already been stopped.
/// </remarks>
private void StopSpinTask()
{
try
{
this.CancellationSource.Cancel();
}
catch (ObjectDisposedException)
{
// task has been canceled before
}
try
{
this.Task.Wait();
}
catch (AggregateException e)
{
e.Handle(inner => inner is TaskCanceledException);
}
catch (ObjectDisposedException)
{
// task has already stopped
}
}

/// <inheritdoc />
/// <remarks>
/// The wrapper handles stopping the spin task.
/// </remarks>
public void Dispose()
{
try
{
this.StopSpinTask();
}
catch (AggregateException)
{
// prevent faulted task from preventing disposal
}
this.Context.OnShutdown -= this.StopSpinTask;
this.Task.Dispose();
this.Executor.Dispose();
this.CancellationSource.Dispose();
}
}
}
1 change: 1 addition & 0 deletions src/ros2cs/ros2cs_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ if(BUILD_TESTING)
src/WaitSetTest.cs
src/GuardConditionTest.cs
src/ManualExecutorTest.cs
src/TaskExecutorTest.cs
)

add_dotnet_test(ros2cs_tests
Expand Down
89 changes: 88 additions & 1 deletion src/ros2cs/ros2cs_tests/src/ManualExecutorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using ROS2.Executors;

Expand Down Expand Up @@ -319,6 +320,92 @@ public void Clear()
Assert.That(this.Executor.RescanScheduled, Is.True);
}

[Test]
public void SpinInTask()
{
using ManualResetEventSlim wasSpun = new ManualResetEventSlim(false);
using var guardCondition = this.Context.CreateGuardCondition(wasSpun.Set);
this.WaitSet.GuardConditions.Add(guardCondition);

using var cancellationSource = new CancellationTokenSource();
using Task spinTask = this.Executor.CreateSpinTask(TimeSpan.FromSeconds(0.5), cancellationSource.Token);

Assert.That(spinTask.Status, Is.EqualTo(TaskStatus.Created));

spinTask.Start();
try
{
while (spinTask.Status != TaskStatus.Running)
{
Thread.Yield(); // wait for task to be scheduled
}
Assert.That(wasSpun.Wait(TimeSpan.FromSeconds(1)), Is.False);
guardCondition.Trigger();
Assert.That(wasSpun.Wait(TimeSpan.FromSeconds(1)), Is.True);
wasSpun.Reset();
}
finally
{
cancellationSource.Cancel();
try
{
spinTask.Wait();
}
catch (AggregateException e)
{
e.Handle(inner => inner is TaskCanceledException);
}
}

Assert.That(spinTask.Status, Is.EqualTo(TaskStatus.Canceled));
guardCondition.Trigger();
Assert.That(wasSpun.Wait(TimeSpan.FromSeconds(1)), Is.False);
}

[Test]
public void ExceptionWhileSpinningInTask()
{
using var guardCondition = this.Context.CreateGuardCondition(() =>
{
throw new SimulatedException("simulating runtime exception");
});
this.WaitSet.GuardConditions.Add(guardCondition);

using var cancellationSource = new CancellationTokenSource();
using Task spinTask = this.Executor.CreateSpinTask(TimeSpan.FromSeconds(0.5), cancellationSource.Token);

spinTask.Start();
try
{
while (spinTask.Status != TaskStatus.Running)
{
Thread.Yield(); // wait for task to be scheduled
}
guardCondition.Trigger();
var exception = Assert.Throws<AggregateException>(() => spinTask.Wait(TimeSpan.FromSeconds(1)));
Assert.That(exception.InnerExceptions, Has.Some.Matches(new Predicate<Exception>(e => e is SimulatedException)));
Assert.That(spinTask.Status, Is.EqualTo(TaskStatus.Faulted));
}
finally
{
cancellationSource.Cancel();
try
{
spinTask.Wait();
}
catch (AggregateException e)
{
e.Handle(inner => inner is TaskCanceledException || inner is SimulatedException);
}
}
}

private sealed class SimulatedException : Exception
{
public SimulatedException(string msg) : base(msg)
{ }
}

private sealed class DummyExecutor : HashSet<INode>, IExecutor
{
public bool IsDisposed
Expand All @@ -343,7 +430,7 @@ public bool TryWait(TimeSpan timeout)
}

public void Dispose()
{}
{ }
}
}
}
Loading

0 comments on commit c70ec62

Please sign in to comment.