From 3d85d2991c9003ef33eb4015b3981101d13ec02f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poul=20Kjeldager=20S=C3=B8rensen?= Date: Sun, 18 Aug 2024 17:51:19 +0200 Subject: [PATCH] fix: made the queue propergate when triggering new actions --- .../ActionImplementationMetadata.cs | 24 +++++++------------ src/WorkflowEngine.Core/ActionResult.cs | 13 ++++++++++ .../HangfireWorkflowExecutor.cs | 18 +++++++------- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/WorkflowEngine.Core/ActionImplementationMetadata.cs b/src/WorkflowEngine.Core/ActionImplementationMetadata.cs index 8f79e10..3cdb852 100644 --- a/src/WorkflowEngine.Core/ActionImplementationMetadata.cs +++ b/src/WorkflowEngine.Core/ActionImplementationMetadata.cs @@ -10,6 +10,8 @@ public class ActionImplementationMetadata public string Type { get; set; } public Type Implementation { get; protected set; } + + public static IActionImplementationMetadata FromType(Type type, string name) { var metadata = Activator.CreateInstance(typeof(ActionImplementationMetadata<>).MakeGenericType(type)) as ActionImplementationMetadata; @@ -39,14 +41,10 @@ public async ValueTask ExecuteAsync(IServiceProvider services, IRu var actionResult = await implementation.ExecuteAsync(context, workflow, action); - var result = new ActionResult - { - Key = action.Key, - Status = "Succeded", - Result = actionResult, - DelayNextAction = (implementation is IWaitAction) ? (TimeSpan) actionResult : null - }; - return result; + return ActionResult.Success(action, actionResult,implementation); + + + } } @@ -69,14 +67,8 @@ public async ValueTask ExecuteAsync(IServiceProvider services, IRu var actionResult= await implementation.ExecuteAsync(context, workflow, typedAction); - var result = new ActionResult - { - Key = action.Key, - Status = "Succeded", - Result = actionResult, - DelayNextAction = (implementation is IWaitAction) ? (TimeSpan) actionResult : null - }; - return result; + return ActionResult.Success(action, actionResult, implementation); + } } diff --git a/src/WorkflowEngine.Core/ActionResult.cs b/src/WorkflowEngine.Core/ActionResult.cs index 6f8f0d1..551715f 100644 --- a/src/WorkflowEngine.Core/ActionResult.cs +++ b/src/WorkflowEngine.Core/ActionResult.cs @@ -1,4 +1,5 @@ using System; +using static System.Collections.Specialized.BitVector32; namespace WorkflowEngine.Core { @@ -11,6 +12,18 @@ public class ActionResult : IActionResult public string FailedReason { get; set; } public bool ReThrow { get; set; } public TimeSpan? DelayNextAction { get; set; } + + public static ActionResult Success(IAction action, object actionResult, object implementation) + { + var result = new ActionResult + { + Key = action.Key, + Status = "Succeded", + Result = actionResult, + DelayNextAction = (implementation is IWaitAction) ? (TimeSpan) actionResult : null + }; + return result; + } } diff --git a/src/WorkflowEngine.Hangfire/HangfireWorkflowExecutor.cs b/src/WorkflowEngine.Hangfire/HangfireWorkflowExecutor.cs index 665e8ba..fee97a4 100644 --- a/src/WorkflowEngine.Hangfire/HangfireWorkflowExecutor.cs +++ b/src/WorkflowEngine.Hangfire/HangfireWorkflowExecutor.cs @@ -86,10 +86,10 @@ public HangfireWorkflowExecutor(IWorkflowAccessor workflowAccessor, IHangfireAct public async ValueTask ExecuteAsync(IRunContext run, IWorkflow workflow, IAction action, PerformContext context) { try - { + { runContextAccessor.RunContext = run; arrayContext.JobId = context.BackgroundJob.Id; - + var queue = context.BackgroundJob.Job.Queue ?? "default"; var result = await actionExecutor.ExecuteAsync(run, workflow, action); @@ -109,12 +109,12 @@ public async ValueTask ExecuteAsync(IRunContext run, IWorkflow workflow, if (result.DelayNextAction.HasValue) { - var workflowRunId = backgroundJobClient.Schedule( + var workflowRunId = backgroundJobClient.Schedule(queue, (executor) => executor.ExecuteAsync(run, workflow, next, null),result.DelayNextAction.Value); } else { - var workflowRunId = backgroundJobClient.Enqueue( + var workflowRunId = backgroundJobClient.Enqueue(queue, (executor) => executor.ExecuteAsync(run, workflow, next, null)); } @@ -133,14 +133,14 @@ public async ValueTask ExecuteAsync(IRunContext run, IWorkflow workflow, if (result.DelayNextAction != null) { - var workflowRunId = backgroundJobClient.Schedule( + var workflowRunId = backgroundJobClient.Schedule(queue, (executor) => executor.ExecuteAsync(run, workflow, scopeAction, null),result.DelayNextAction.Value); } else { - var workflowRunId = backgroundJobClient.Enqueue( + var workflowRunId = backgroundJobClient.Enqueue(queue, (executor) => executor.ExecuteAsync(run, workflow, scopeAction, null)); } //await outputRepository.EndScope(run, workflow, action); @@ -177,8 +177,8 @@ public async ValueTask TriggerAsync(ITriggerContext triggerContext) /// /// public async ValueTask TriggerAsync(ITriggerContext triggerContext, PerformContext context = null) - { - + { + var queue = context.BackgroundJob.Job.Queue ?? "default"; triggerContext.RunId = triggerContext.RunId == Guid.Empty ? Guid.NewGuid() : triggerContext.RunId; triggerContext.JobId = context?.BackgroundJob.Id; @@ -187,7 +187,7 @@ public async ValueTask TriggerAsync(ITriggerContext triggerContext, Perf if (action != null) { - var a = backgroundJobClient.Enqueue( + var a = backgroundJobClient.Enqueue(queue, (executor) => executor.ExecuteAsync(triggerContext, triggerContext.Workflow, action, null)); } return action;