Skip to content

Commit

Permalink
Added workflow concurrency example for .NET
Browse files Browse the repository at this point in the history
Signed-off-by: Whit Waldo <[email protected]>
  • Loading branch information
WhitWaldo committed May 7, 2024
1 parent 47dad55 commit 41fa825
Showing 1 changed file with 54 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,60 @@ The key takeaways from this example are:
- The number of parallel tasks can be static or dynamic
- The workflow itself is capable of aggregating the results of parallel executions
While not shown in the example, it's possible to go further and limit the degree of concurrency using simple, language-specific constructs. Furthermore, the execution of the workflow is durable. If a workflow starts 100 parallel task executions and only 40 complete before the process crashes, the workflow restarts itself automatically and only schedules the remaining 60 tasks.
Furthermore, the execution of the workflow is durable. If a workflow starts 100 parallel task executions and only 40 complete before the process crashes, the workflow restarts itself automatically and only schedules the remaining 60 tasks.
It's possible to go further and limit the degree of concurrency using simple, language-specific constructs.
{{< tabs ".NET" >}}
{{% codetab %}}
<!-- .NET -->
```csharp
public static class TaskExtensions
{
public static async Task<IEnumerable<T>> WhenAllWithLimitAsync<T>(this IEnumerable<Task<T>>> tasks, int maxDegreeOfParallelism)
{
var results = new List<T>();
var inFlight = new HashSet<Task<T>>();
foreach (var task in tasks)
{
if (inFlight.Count > maxParallelism)
{
var finishedTask = await Task.WhenAny(inFlight);
results.Add(finishedTask.Result);
inFlight.Remove(finishedTask);
}
inFlight.Add(context.CallActivityAsync(task))
}
//Wait for all the remaining tasks to complete
await Task.WhenAll(inFlight);
}
}
//Revisiting the earlier example...
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);
// Schedule the parallel tasks, but don't wait for them to complete yet
var parallelTasks = new List<Task<int>>();
foreach (var workItem in workBatch)
{
var task = context.CallActivityAsync<int>("ProcessWorkItem", workItem);
parallelTasks.Add(task);
}
// This is where we diverge from the previous example and use our new extension method above to limit concurrency to 5 tasks at a time
var result = await Task.WhenAllWithLimitAsync(parallelTasks, 5);
var sum = result.Sum(t => t);
await context.CallActivityAsync("PostResults", sum);
```
{{% /codetab %}}
{{< /tabs >}}
## Async HTTP APIs
Expand Down

0 comments on commit 41fa825

Please sign in to comment.