Skip to content

Commit

Permalink
fix: made the queue propergate when triggering new actions
Browse files Browse the repository at this point in the history
  • Loading branch information
pksorensen committed Aug 18, 2024
1 parent fa4d37e commit 3d85d29
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 25 deletions.
24 changes: 8 additions & 16 deletions src/WorkflowEngine.Core/ActionImplementationMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class ActionImplementationMetadata
public string Type { get; set; }
public Type Implementation { get; protected set; }



public static IActionImplementationMetadata FromType(Type type, string name)
{
var metadata = Activator.CreateInstance(typeof(ActionImplementationMetadata<>).MakeGenericType(type)) as ActionImplementationMetadata;
Expand Down Expand Up @@ -39,14 +41,10 @@ public async ValueTask<ActionResult> ExecuteAsync(IServiceProvider services, IRu

var actionResult = await implementation.ExecuteAsync(context, workflow, action);

var result = new ActionResult
{
Key = action.Key,
Status = "Succeded",
Result = actionResult,
DelayNextAction = (implementation is IWaitAction) ? (TimeSpan) actionResult : null
};
return result;
return ActionResult.Success(action, actionResult,implementation);



}
}

Expand All @@ -69,14 +67,8 @@ public async ValueTask<ActionResult> ExecuteAsync(IServiceProvider services, IRu

var actionResult= await implementation.ExecuteAsync(context, workflow, typedAction);

var result = new ActionResult
{
Key = action.Key,
Status = "Succeded",
Result = actionResult,
DelayNextAction = (implementation is IWaitAction) ? (TimeSpan) actionResult : null
};
return result;
return ActionResult.Success(action, actionResult, implementation);

}
}

Expand Down
13 changes: 13 additions & 0 deletions src/WorkflowEngine.Core/ActionResult.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using static System.Collections.Specialized.BitVector32;

namespace WorkflowEngine.Core
{
Expand All @@ -11,6 +12,18 @@ public class ActionResult : IActionResult
public string FailedReason { get; set; }
public bool ReThrow { get; set; }
public TimeSpan? DelayNextAction { get; set; }

public static ActionResult Success(IAction action, object actionResult, object implementation)
{
var result = new ActionResult
{
Key = action.Key,
Status = "Succeded",
Result = actionResult,
DelayNextAction = (implementation is IWaitAction) ? (TimeSpan) actionResult : null
};
return result;
}
}


Expand Down
18 changes: 9 additions & 9 deletions src/WorkflowEngine.Hangfire/HangfireWorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ public HangfireWorkflowExecutor(IWorkflowAccessor workflowAccessor, IHangfireAct
public async ValueTask<object> ExecuteAsync(IRunContext run, IWorkflow workflow, IAction action, PerformContext context)
{
try
{
{
runContextAccessor.RunContext = run;
arrayContext.JobId = context.BackgroundJob.Id;

var queue = context.BackgroundJob.Job.Queue ?? "default";

var result = await actionExecutor.ExecuteAsync(run, workflow, action);

Expand All @@ -109,12 +109,12 @@ public async ValueTask<object> ExecuteAsync(IRunContext run, IWorkflow workflow,
if (result.DelayNextAction.HasValue)
{

var workflowRunId = backgroundJobClient.Schedule<IHangfireActionExecutor>(
var workflowRunId = backgroundJobClient.Schedule<IHangfireActionExecutor>(queue,
(executor) => executor.ExecuteAsync(run, workflow, next, null),result.DelayNextAction.Value);
}
else
{
var workflowRunId = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
var workflowRunId = backgroundJobClient.Enqueue<IHangfireActionExecutor>(queue,
(executor) => executor.ExecuteAsync(run, workflow, next, null));
}

Expand All @@ -133,14 +133,14 @@ public async ValueTask<object> ExecuteAsync(IRunContext run, IWorkflow workflow,
if (result.DelayNextAction != null)
{

var workflowRunId = backgroundJobClient.Schedule<IHangfireActionExecutor>(
var workflowRunId = backgroundJobClient.Schedule<IHangfireActionExecutor>(queue,
(executor) => executor.ExecuteAsync(run, workflow, scopeAction, null),result.DelayNextAction.Value);
}
else
{


var workflowRunId = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
var workflowRunId = backgroundJobClient.Enqueue<IHangfireActionExecutor>(queue,
(executor) => executor.ExecuteAsync(run, workflow, scopeAction, null));
}
//await outputRepository.EndScope(run, workflow, action);
Expand Down Expand Up @@ -177,8 +177,8 @@ public async ValueTask<object> TriggerAsync(ITriggerContext triggerContext)
/// <param name="triggerContext"></param>
/// <returns></returns>
public async ValueTask<object> TriggerAsync(ITriggerContext triggerContext, PerformContext context = null)
{

{
var queue = context.BackgroundJob.Job.Queue ?? "default";
triggerContext.RunId = triggerContext.RunId == Guid.Empty ? Guid.NewGuid() : triggerContext.RunId;
triggerContext.JobId = context?.BackgroundJob.Id;

Expand All @@ -187,7 +187,7 @@ public async ValueTask<object> TriggerAsync(ITriggerContext triggerContext, Perf

if (action != null)
{
var a = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
var a = backgroundJobClient.Enqueue<IHangfireActionExecutor>(queue,
(executor) => executor.ExecuteAsync(triggerContext, triggerContext.Workflow, action, null));
}
return action;
Expand Down

0 comments on commit 3d85d29

Please sign in to comment.