Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added example for limiting workflow concurrency in .NET fan-in/out example #4132

Merged
merged 8 commits into from
May 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,64 @@ The key takeaways from this example are:
- The number of parallel tasks can be static or dynamic
- The workflow itself is capable of aggregating the results of parallel executions

While not shown in the example, it's possible to go further and limit the degree of concurrency using simple, language-specific constructs. Furthermore, the execution of the workflow is durable. If a workflow starts 100 parallel task executions and only 40 complete before the process crashes, the workflow restarts itself automatically and only schedules the remaining 60 tasks.
Furthermore, the execution of the workflow is durable. If a workflow starts 100 parallel task executions and only 40 complete before the process crashes, the workflow restarts itself automatically and only schedules the remaining 60 tasks.

It's possible to go further and limit the degree of concurrency using simple, language-specific constructs.
WhitWaldo marked this conversation as resolved.
Show resolved Hide resolved

{{< tabs ".NET" >}}

{{% codetab %}}
<!-- .NET -->
```csharp
public static class TaskExtensions
{
public static async Task<IEnumerable<T>> WhenAllWithLimitAsync<T>(this IEnumerable<Task<T>>> tasks, string activityName, int maxDegreeOfParallelism)
WhitWaldo marked this conversation as resolved.
Show resolved Hide resolved
{
var results = new List<T>();
var inFlight = new HashSet<Task<T>>();
foreach (var task in tasks)
{
if (inFlight.Count > maxParallelism)
{
var finishedTask = await Task.WhenAny(inFlight);
results.Add(finishedTask.Result);
inFlight.Remove(finishedTask);
}

inFlight.Add(context.CallActivityAsync<int>(task))
}

//Wait for all the remaining tasks to complete
await Task.WhenAll(inFlight);
WhitWaldo marked this conversation as resolved.
Show resolved Hide resolved
}
}

//Revisiting the earlier example...
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);

const int MaxParallelism = 5;
var results = new List<int>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

performance nit:

Suggested change
var results = new List<int>();
var results = new List<int>(capacity: workBatch.Length);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually quite curious about this as I've seen this proposed elsewhere, so I wrote up a small benchmark to test it out. WithListSize is your proposal and WithoutListSize is the original. I tested with a set of 10, 100, 1000 and 10000 numbers in the list.

My benchmarking code:

[MemoryDiagnoser]
public class ListComparisonTest
{
    private static readonly Random _random = new();
    private static List<int> _numbers = [];

    [Params(10, 100, 1000, 10000)]
    public int N = 1000;

    [GlobalSetup]
    public void Setup()
    {
        _numbers = [];
        for (var a = 0; a < N; a++)
        {
            _numbers.Add(_random.Next(1, 100));
        }
    }

    [Benchmark]
    public void WithListSize()
    {
        var newList = new List<int>(_numbers.Count);
        newList.AddRange(_numbers);
    }

    [Benchmark]
    
    public void WithoutListSize()
    {
        var newList = new List<int>();
        newList.AddRange(_numbers);
    }
}

And the summary:

BenchmarkDotNet v0.13.12, Windows 10 (10.0.19045.4291/22H2/2022Update)
AMD Ryzen Threadripper 1950X, 1 CPU, 32 logical and 16 physical cores
.NET SDK 8.0.300-preview.24203.14
[Host] : .NET 8.0.4 (8.0.424.16909), X64 RyuJIT AVX2
DefaultJob : .NET 8.0.4 (8.0.424.16909), X64 RyuJIT AVX2

Method N Mean Error StdDev Gen0 Gen1 Allocated
WithListSize 10 39.94 ns 0.814 ns 1.243 ns 0.0006 - 96 B
WithoutListSize 10 38.96 ns 0.800 ns 1.315 ns 0.0006 - 96 B
WithListSize 100 125.76 ns 2.475 ns 3.627 ns 0.0026 - 456 B
WithoutListSize 100 125.06 ns 2.491 ns 4.229 ns 0.0026 - 456 B
WithListSize 1000 1,010.31 ns 19.634 ns 29.387 ns 0.0248 - 4056 B
WithoutListSize 1000 1,002.86 ns 19.489 ns 30.343 ns 0.0248 - 4056 B
WithListSize 10000 9,446.83 ns 185.874 ns 272.452 ns 0.2594 0.0305 40056 B
WithoutListSize 10000 9,287.98 ns 185.632 ns 190.630 ns 0.2594 0.0305 40056 B

In conclusion, the allocations are the same either way, but it's ever so slightly faster performance to stick with new List<int>() over new List<int>(N).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be because you're using AddRange in your benchmark instead of Add? Using a for-loop in the benchmark which makes individual Add calls might be more representative.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair - I rewrote the benchmark to be nearly identical to the code here:

[MemoryDiagnoser]
public class TaskTest
{
    private static readonly Random _random = new();
    private static List<int> _numbers = [];

    [Params(10, 100, 1000, 10000)]
    public int N = 1000;

    [GlobalSetup]
    public void Setup()
    {
        _numbers = [];
        for (var a = 0; a < N; a++)
        {
            _numbers.Add(_random.Next(1, 100));
        }
    }
    
    [Benchmark]
    public async Task InitializeEmptyList()
    {
        const int maxParallelism = 5;
        var results = new List<int>();
        var inFlightTasks = new HashSet<Task<int>>();
        foreach (var workItem in _numbers)
        {
            if (inFlightTasks.Count >= maxParallelism)
            {
                var finishedTask = await Task.WhenAny(inFlightTasks);
                results.Add(finishedTask.Result);
                inFlightTasks.Remove(finishedTask);
            }

            inFlightTasks.Add(Task.Run(() => workItem + 1));
        }

        results.AddRange(await Task.WhenAll(inFlightTasks));
        var sum = results.Sum(t => t);
    }

    [Benchmark]
    public async Task InitializeListSize()
    {
        const int maxParallelism = 5;
        var results = new List<int>();
        var inFlightTasks = new HashSet<Task<int>>(_numbers.Count);
        foreach (var workItem in _numbers)
        {
            if (inFlightTasks.Count >= maxParallelism)
            {
                var finishedTask = await Task.WhenAny(inFlightTasks);
                results.Add(finishedTask.Result);
                inFlightTasks.Remove(finishedTask);
            }

            inFlightTasks.Add(Task.Run(() => workItem + 1));
        }

        results.AddRange(await Task.WhenAll(inFlightTasks));
        var sum = results.Sum(t => t);
    }
}

I've got to say, I've run this test a dozen times with different iterations and from a timing perspective, the results are mixed. From a memory allocation perspective, with the exception of small list counts (sub 40 items), the allocated memory is less with an empty list than when setting the size during initialization. In the latest iteration, below, the empty list was ever so slightly slower than the initialized list size approach, but the speed has flip flopped back and forth each time I've run it (perhaps it's the use of the async tasks that's fiddling with the speed, who knows). Either way, allocations being the only regular difference between the two, I don't know the performance benefit is there to specify the size at initialization:

Round 1
BenchmarkDotNet v0.13.12, Windows 10 (10.0.19045.4291/22H2/2022Update)
AMD Ryzen Threadripper 1950X, 1 CPU, 32 logical and 16 physical cores
.NET SDK 8.0.300-preview.24203.14
[Host] : .NET 8.0.4 (8.0.424.16909), X64 RyuJIT AVX2 [AttachedDebugger]
DefaultJob : .NET 8.0.4 (8.0.424.16909), X64 RyuJIT AVX2

Method N Mean Error StdDev Gen0 Allocated
InitializeEmptyList 10 37.50 us 0.748 us 1.956 us - 6.31 KB
InitializeListSize 10 36.70 us 0.731 us 2.050 us - 6.28 KB
InitializeEmptyList 40 138.56 us 2.732 us 5.703 us - 24.7 KB
InitializeListSize 40 141.56 us 2.811 us 4.032 us - 25.65 KB
InitializeEmptyList 75 251.37 us 4.848 us 6.132 us - 46.29 KB
InitializeListSize 75 244.88 us 4.851 us 9.229 us - 47.93 KB
InitializeEmptyList 100 329.68 us 6.467 us 8.178 us - 61.27 KB
InitializeListSize 100 321.43 us 6.428 us 15.645 us - 63.15 KB
InitializeEmptyList 250 750.08 us 14.712 us 28.695 us - 152.47 KB
InitializeListSize 250 726.80 us 14.371 us 29.031 us - 158.12 KB
InitializeEmptyList 500 1,363.58 us 25.403 us 48.943 us - 303.71 KB
InitializeListSize 500 1,321.89 us 26.422 us 29.368 us - 314.1 KB
InitializeEmptyList 1000 2,551.46 us 50.880 us 108.429 us - 606.43 KB
InitializeListSize 1000 2,479.43 us 48.604 us 57.860 us 3.9063 628.3 KB
InitializeEmptyList 10000 20,781.63 us 412.799 us 861.664 us 31.2500 6091.7 KB
InitializeListSize 10000 21,037.86 us 414.723 us 704.231 us 31.2500 6282.6 KB

Round 2

Method N Mean Error StdDev Median Gen0 Allocated
InitializeEmptyList 10 37.57 us 0.828 us 2.428 us 36.86 us - 6.31 KB
InitializeListSize 10 37.17 us 0.742 us 1.994 us 36.69 us - 6.29 KB
InitializeEmptyList 40 137.78 us 2.697 us 3.868 us 137.77 us - 24.75 KB
InitializeListSize 40 140.26 us 2.765 us 6.571 us 139.10 us - 25.58 KB
InitializeEmptyList 75 246.97 us 4.855 us 10.026 us 245.65 us - 46.23 KB
InitializeListSize 75 250.40 us 4.928 us 9.134 us 247.48 us - 47.84 KB
InitializeEmptyList 100 327.20 us 6.367 us 10.281 us 325.72 us - 61.37 KB
InitializeListSize 100 328.35 us 6.503 us 14.410 us 323.16 us - 63.33 KB
InitializeEmptyList 250 744.40 us 14.676 us 24.922 us 734.19 us - 152.18 KB
InitializeListSize 250 731.99 us 14.580 us 26.291 us 723.94 us 0.9766 157.96 KB
InitializeEmptyList 500 1,357.14 us 27.141 us 61.815 us 1,341.14 us - 304.02 KB
InitializeListSize 500 1,305.45 us 26.103 us 55.060 us 1,302.53 us - 313.86 KB
InitializeEmptyList 1000 2,541.60 us 50.459 us 126.592 us 2,515.37 us - 606.98 KB
InitializeListSize 1000 2,411.67 us 47.206 us 87.500 us 2,426.39 us - 627.31 KB
InitializeEmptyList 10000 20,978.93 us 408.484 us 598.750 us 21,025.14 us 31.2500 6086.05 KB
InitializeListSize 10000 20,665.67 us 409.248 us 573.708 us 20,716.74 us 31.2500 6286 KB

var inFlightTasks = new HashSet<Task<int>>();
foreach(var workItem in workBatch)
{
if (inFlightTasks.Count > MaxParallelism)
WhitWaldo marked this conversation as resolved.
Show resolved Hide resolved
{
var finishedTask = await Task.WhenAny(inFlightTasks);
results.Add(finishedTask.Result);
inFlightTasks.Remove(finishedTask);
}

inFlightTasks.Add(context.CallActivityAsync<int>("ProcessWorkItem", workItem));
}

WhitWaldo marked this conversation as resolved.
Show resolved Hide resolved
var sum = results.Sum(t => t);
await context.CallActivityAsync("PostResults", sum);
```

{{% /codetab %}}

{{< /tabs >}}

WhitWaldo marked this conversation as resolved.
Show resolved Hide resolved
## Async HTTP APIs

Expand Down
Loading