diff --git a/internal/error.go b/internal/error.go index a00656202..e6de00e78 100644 --- a/internal/error.go +++ b/internal/error.go @@ -113,7 +113,7 @@ type ( // PanicError contains information about panicked workflow/activity. PanicError struct { - value string + value interface{} stackTrace string } @@ -296,12 +296,12 @@ func (e *CanceledError) Details(d ...interface{}) error { } func newPanicError(value interface{}, stackTrace string) *PanicError { - return &PanicError{value: fmt.Sprintf("%v", value), stackTrace: stackTrace} + return &PanicError{value: value, stackTrace: stackTrace} } // Error from error interface func (e *PanicError) Error() string { - return e.value + return fmt.Sprintf("%v", e.value) } // StackTrace return stack trace of the panic diff --git a/internal/internal_decision_state_machine.go b/internal/internal_decision_state_machine.go index 97938fadf..b75dc2bec 100644 --- a/internal/internal_decision_state_machine.go +++ b/internal/internal_decision_state_machine.go @@ -111,6 +111,11 @@ type ( scheduledEventIDToCancellationID map[int64]string scheduledEventIDToSignalID map[int64]string } + + // panic when decision state machine is in illegal state + stateMachineIllegalStatePanic struct { + message string + } ) const ( @@ -306,9 +311,17 @@ func (d *decisionStateMachineBase) moveState(newState decisionState, event strin } } +func (d stateMachineIllegalStatePanic) String() string { + return d.message +} + +func panicIllegalState(message string) { + panic(stateMachineIllegalStatePanic{message: message}) +} + func (d *decisionStateMachineBase) failStateTransition(event string) { // this is when we detect illegal state transition, likely due to ill history sequence or nondeterministic decider code - panic(fmt.Sprintf("invalid state transition: attempt to %v, %v", event, d)) + panicIllegalState(fmt.Sprintf("invalid state transition: attempt to %v, %v", event, d)) } func (d *decisionStateMachineBase) handleDecisionSent() { @@ -660,7 +673,7 @@ func (h *decisionsHelper) getDecision(id decisionID) decisionStateMachine { if !ok { panicMsg := fmt.Sprintf("unknown decision %v, possible causes are nondeterministic workflow definition code"+ " or incompatible change in the workflow definition", id) - panic(panicMsg) + panicIllegalState(panicMsg) } // Move the last update decision state machine to the back of the list. // Otherwise decisions (like timer cancellations) can end up out of order. @@ -671,7 +684,7 @@ func (h *decisionsHelper) getDecision(id decisionID) decisionStateMachine { func (h *decisionsHelper) addDecision(decision decisionStateMachine) { if _, ok := h.decisions[decision.getID()]; ok { panicMsg := fmt.Sprintf("adding duplicate decision %v", decision) - panic(panicMsg) + panicIllegalState(panicMsg) } element := h.orderedDecisions.PushBack(decision) h.decisions[decision.getID()] = element @@ -730,12 +743,12 @@ func (h *decisionsHelper) getActivityID(event *s.HistoryEvent) string { case s.EventTypeActivityTaskTimedOut: scheduledEventID = event.ActivityTaskTimedOutEventAttributes.GetScheduledEventId() default: - panic(fmt.Sprintf("unexpected event type %v", event.GetEventType())) + panicIllegalState(fmt.Sprintf("unexpected event type %v", event.GetEventType())) } activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID] if !ok { - panic(fmt.Sprintf("unable to find activity ID for the event %v", util.HistoryEventToString(event))) + panicIllegalState(fmt.Sprintf("unable to find activity ID for the event %v", util.HistoryEventToString(event))) } return activityID } diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index edd852b0c..9a1733f0a 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -719,6 +719,14 @@ ProcessEvents: } } + // Non-deterministic error could happen in 2 different places: + // 1) the replay decisions does not match to history events. This is usually due to non backwards compatible code + // change to decider logic. For example, change calling one activity to a different activity. + // 2) the decision state machine is trying to make illegal state transition while replay a history event (like + // activity task completed), but the corresponding decider code that start the event has been removed. In that case + // the replay of that event will panic on the decision state machine and the workflow will be marked as completed + // with the panic error. + var nonDeterministicErr error if !skipReplayCheck && !w.isWorkflowCompleted { // check if decisions from reply matches to the history events if err := matchReplayWithHistory(replayDecisions, respondEvents); err != nil { @@ -728,29 +736,32 @@ ProcessEvents: zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()), zap.String(tagRunID, task.WorkflowExecution.GetRunId()), zap.Error(err)) - - // Whether or not we store the error in workflowContext.err makes - // a significant difference, to the point that it affects client's observable - // behavior as far as handling non-deterministic workflows. - // - // If we store it in workflowContext.err, the decision task completion code - // will pick up the error and correctly wrap it in the response request we sent back - // to the server, which in this case will contain a request to fail the workflow. - // - // If we simply return the error, the decision task completion code path will not - // execute at all, therefore, no response is sent back to the server and we will - // look like a decision task time out. - switch w.wth.nonDeterministicWorkflowPolicy { - case NonDeterministicWorkflowPolicyFailWorkflow: - eventHandler.Complete(nil, NewCustomError("nondeterministic workflow", err.Error())) - case NonDeterministicWorkflowPolicyBlockWorkflow: - return nil, err - default: - panic(fmt.Sprintf("unknown mismatched workflow history policy.")) + nonDeterministicErr = err + } + } + if nonDeterministicErr == nil && w.err != nil { + if panicErr, ok := w.err.(*PanicError); ok && panicErr.value != nil { + if _, isStateMachinePanic := panicErr.value.(stateMachineIllegalStatePanic); isStateMachinePanic { + nonDeterministicErr = panicErr } } } + if nonDeterministicErr != nil { + switch w.wth.nonDeterministicWorkflowPolicy { + case NonDeterministicWorkflowPolicyFailWorkflow: + // complete workflow with custom error will fail the workflow + eventHandler.Complete(nil, NewCustomError("NonDeterministicWorkflowPolicyFailWorkflow", nonDeterministicErr.Error())) + case NonDeterministicWorkflowPolicyBlockWorkflow: + // return error here will be convert to DecisionTaskFailed for the first time, and ignored for subsequent + // attempts which will cause DecisionTaskTimeout and server will retry forever until issue got fixed or + // workflow timeout. + return nil, nonDeterministicErr + default: + panic(fmt.Sprintf("unknown mismatched workflow history policy.")) + } + } + return w.CompleteDecisionTask(workflowTask, true), nil } diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index bc801898c..847228625 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -463,7 +463,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() { t.True(len(response.Decisions) > 0) closeDecision := response.Decisions[len(response.Decisions)-1] t.Equal(*closeDecision.DecisionType, s.DecisionTypeFailWorkflowExecution) - t.Contains(*closeDecision.FailWorkflowExecutionDecisionAttributes.Reason, "nondeterministic") + t.Contains(*closeDecision.FailWorkflowExecutionDecisionAttributes.Reason, "NonDeterministicWorkflowPolicyFailWorkflow") // now with different package name to activity type testEvents[4].ActivityTaskScheduledEventAttributes.ActivityType.Name = common.StringPtr("new-package.Greeter_Activity")