From 41fa8255fb16621e0fb41fd7c2ae3583bc84afe4 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Tue, 7 May 2024 07:48:02 -0500 Subject: [PATCH] Added workflow concurrency example for .NET Signed-off-by: Whit Waldo --- .../workflow/workflow-patterns.md | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index f7865f55e9c..b15963f7fe2 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -586,7 +586,60 @@ The key takeaways from this example are: - The number of parallel tasks can be static or dynamic - The workflow itself is capable of aggregating the results of parallel executions -While not shown in the example, it's possible to go further and limit the degree of concurrency using simple, language-specific constructs. Furthermore, the execution of the workflow is durable. If a workflow starts 100 parallel task executions and only 40 complete before the process crashes, the workflow restarts itself automatically and only schedules the remaining 60 tasks. +Furthermore, the execution of the workflow is durable. If a workflow starts 100 parallel task executions and only 40 complete before the process crashes, the workflow restarts itself automatically and only schedules the remaining 60 tasks. + +It's possible to go further and limit the degree of concurrency using simple, language-specific constructs. + +{{< tabs ".NET" >}} + +{{% codetab %}} + +```csharp +public static class TaskExtensions +{ + public static async Task> WhenAllWithLimitAsync(this IEnumerable>> tasks, int maxDegreeOfParallelism) + { + var results = new List(); + var inFlight = new HashSet>(); + foreach (var task in tasks) + { + if (inFlight.Count > maxParallelism) + { + var finishedTask = await Task.WhenAny(inFlight); + results.Add(finishedTask.Result); + inFlight.Remove(finishedTask); + } + + inFlight.Add(context.CallActivityAsync(task)) + } + + //Wait for all the remaining tasks to complete + await Task.WhenAll(inFlight); + } +} + +//Revisiting the earlier example... +// Get a list of N work items to process in parallel. +object[] workBatch = await context.CallActivityAsync("GetWorkBatch", null); + +// Schedule the parallel tasks, but don't wait for them to complete yet +var parallelTasks = new List>(); +foreach (var workItem in workBatch) +{ + var task = context.CallActivityAsync("ProcessWorkItem", workItem); + parallelTasks.Add(task); +} + +// This is where we diverge from the previous example and use our new extension method above to limit concurrency to 5 tasks at a time +var result = await Task.WhenAllWithLimitAsync(parallelTasks, 5); + +var sum = result.Sum(t => t); +await context.CallActivityAsync("PostResults", sum); +``` + +{{% /codetab %}} + +{{< /tabs >}} ## Async HTTP APIs