Skip to content

Commit

Permalink
implement type-based solution
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmrdavid committed Sep 13, 2024
1 parent a335871 commit f2942b2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ internal bool TryGetOrchestrationErrorDetails(out Exception? failure)
return hasError;
}

internal void SetResult(IEnumerable<OrchestratorAction> actions, string customStatus)
internal void TrySetResult(IEnumerable<OrchestratorAction> actions, string customStatus)
{
var result = new OrchestratorExecutionResult
{
CustomStatus = customStatus,
Actions = actions,
};

this.SetResultInternal(result);
this.TrySetResultInternal(result);
}

// TODO: This method should be considered deprecated because SDKs should no longer be returning results as JSON.
Expand Down Expand Up @@ -117,10 +117,39 @@ internal void SetResult(string orchestratorResponseJsonText)
innerException: jsonReaderException);
}

this.SetResultInternal(result);
this.TrySetResultInternal(result);
}

private void SetResultInternal(OrchestratorExecutionResult result)
/// <summary>
/// Recursively inspect the FailureDetails of the failed orchestrator and throw if a platform-level exception is detected.
/// </summary>
/// <remarks>
/// Today, this method only checks for <see cref="OutOfMemoryException"/>. In the future, we may want to add more cases.
/// Other known platform-level exceptions, like timeouts or process exists due to `Environment.FailFast`, do not yield
/// a `OrchestratorExecutionResult` as the isolated invocation is abruptly terminated. Therefore, they don't need to be
/// handled in this method.
/// However, our tests reveal that OOMs are, surprisngly, caught and returned as a `OrchestratorExecutionResult`
/// by the isolated process, and thus need special handling.
/// It's unclear if all OOMs are caught by the isolated process (probably not), and also if there are other platform-level
/// errors that are also caught in the isolated process and returned as a `OrchestratorExecutionResult`. Let's add them
/// to this method as we encounter them.
/// </remarks>
/// <param name="failureDetails">The failure details of the orchestrator.</param>
/// <exception cref="OutOfMemoryException">If an OOM error is detected.</exception>
private void ThrowIfPlatformLevelException(FailureDetails failureDetails)
{
if (failureDetails.InnerFailure?.IsCausedBy<OutOfMemoryException>() ?? false)
{
throw new OutOfMemoryException(failureDetails.ErrorMessage);
}

if (failureDetails.InnerFailure != null)
{
this.ThrowIfPlatformLevelException(failureDetails.InnerFailure);
}
}

private void TrySetResultInternal(OrchestratorExecutionResult result)
{
// Look for an orchestration completion action to see if we need to grab the output.
foreach (OrchestratorAction action in result.Actions)
Expand All @@ -133,6 +162,14 @@ private void SetResultInternal(OrchestratorExecutionResult result)

if (completeAction.OrchestrationStatus == OrchestrationStatus.Failed)
{
// If the orchestrator failed due to a platform-level error in the isolated process,
// we should re-throw that exception in the host (this process) invocation pipeline,
// so the invocation can be retried.
if (completeAction.FailureDetails != null)
{
this.ThrowIfPlatformLevelException(completeAction.FailureDetails);
}

string message = completeAction switch
{
{ FailureDetails: { } f } => f.ErrorMessage,
Expand Down
52 changes: 21 additions & 31 deletions src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,15 @@ await this.LifeCycleNotificationHelper.OrchestratorStartingAsync(
byte[] triggerReturnValueBytes = Convert.FromBase64String(triggerReturnValue);
P.OrchestratorResponse response = P.OrchestratorResponse.Parser.ParseFrom(triggerReturnValueBytes);
context.SetResult(
// TrySetResult may throw if a platform-level error is encountered (like an out of memory exception).
context.TrySetResult(
response.Actions.Select(ProtobufUtils.ToOrchestratorAction),
response.CustomStatus);
// Here we throw if the orchestrator completed with an application-level error. When we do this,
// the function's result type will be of type `OrchestrationFailureException` which is reserved
// for application-level errors that do not need to be re-tried.
context.ThrowIfFailed();
},
#pragma warning restore CS0618 // Type or member is obsolete (not intended for general public use)
Expand All @@ -159,6 +164,20 @@ await this.LifeCycleNotificationHelper.OrchestratorStartingAsync(
// Re-throw so we can abort this invocation.
this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
}

// we abort the invocation on "platform level errors" such as:
// - a timeout
// - an out of memory exception
// - a worker process exit
if (functionResult.Exception is Host.FunctionTimeoutException
// see in RemoteOrchestrationContext.TrySetResultInternal for details on OOM-handling

Check warning on line 173 in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 173 in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 173 in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 173 in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 173 in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Check warning on line 173 in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Check warning on line 173 in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs

View workflow job for this annotation

GitHub Actions / build

Check warning on line 173 in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs

View workflow job for this annotation

GitHub Actions / build

|| functionResult.Exception?.InnerException is OutOfMemoryException
|| (functionResult.Exception?.InnerException?.GetType().ToString().Contains("WorkerProcessExitException") ?? false))
{
// TODO: the `WorkerProcessExitException` type is not exposed in our dependencies, it's part of WebJobs.Host.Script.
// Should we add that dependency or should it be exposed in WebJobs.Host?
throw functionResult.Exception;
}
}
catch (Exception hostRuntimeException)
{
Expand Down Expand Up @@ -214,8 +233,7 @@ await this.LifeCycleNotificationHelper.OrchestratorCompletedAsync(
isReplay: false);
}
}
else if (context.TryGetOrchestrationErrorDetails(out Exception? exception)
&& (exception?.GetType() != typeof(OrchestrationFailureException) || !exception.Message.Contains("OutOfMemoryException")))
else if (context.TryGetOrchestrationErrorDetails(out Exception? exception))
{
// the function failed because the orchestrator failed.

Expand All @@ -236,20 +254,6 @@ await this.LifeCycleNotificationHelper.OrchestratorFailedAsync(
exception?.Message ?? string.Empty,
isReplay: false);
}
else if (exception?.GetType() == typeof(OrchestrationFailureException) && exception.Message.Contains("OutOfMemoryException"))
{
string reason = $"Out Of Memory exception thrown by the worker runtime: {exception}";

this.TraceHelper.FunctionAborted(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
reason,
functionType: FunctionType.Orchestrator);

// This will abort the current execution and force an durable retry
throw new SessionAbortedException(reason);
}
else
{
// the function failed for some other reason
Expand Down Expand Up @@ -571,20 +575,6 @@ public async Task CallActivityAsync(DispatchMiddlewareContext dispatchContext, F
result: serializedOutput),
};
}
else if (result.Exception is not null && result.Exception.Message.Contains("OutOfMemoryException"))
{
string reason = $"Out Of Memory exception thrown by the worker runtime: {result.Exception}";

this.TraceHelper.FunctionAborted(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
reason,
functionType: FunctionType.Activity);

// This will abort the current execution and force an durable retry
throw new SessionAbortedException(reason);
}
else
{
this.TraceHelper.FunctionFailed(
Expand Down

0 comments on commit f2942b2

Please sign in to comment.