Skip to content

Commit

Permalink
Handle illegal StateMachine transition as Non-Deterministic error (#597)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc authored Oct 25, 2018
1 parent 167c85a commit 2875316
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 28 deletions.
6 changes: 3 additions & 3 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type (

// PanicError contains information about panicked workflow/activity.
PanicError struct {
value string
value interface{}
stackTrace string
}

Expand Down Expand Up @@ -296,12 +296,12 @@ func (e *CanceledError) Details(d ...interface{}) error {
}

func newPanicError(value interface{}, stackTrace string) *PanicError {
return &PanicError{value: fmt.Sprintf("%v", value), stackTrace: stackTrace}
return &PanicError{value: value, stackTrace: stackTrace}
}

// Error from error interface
func (e *PanicError) Error() string {
return e.value
return fmt.Sprintf("%v", e.value)
}

// StackTrace return stack trace of the panic
Expand Down
23 changes: 18 additions & 5 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ type (
scheduledEventIDToCancellationID map[int64]string
scheduledEventIDToSignalID map[int64]string
}

// panic when decision state machine is in illegal state
stateMachineIllegalStatePanic struct {
message string
}
)

const (
Expand Down Expand Up @@ -306,9 +311,17 @@ func (d *decisionStateMachineBase) moveState(newState decisionState, event strin
}
}

func (d stateMachineIllegalStatePanic) String() string {
return d.message
}

func panicIllegalState(message string) {
panic(stateMachineIllegalStatePanic{message: message})
}

func (d *decisionStateMachineBase) failStateTransition(event string) {
// this is when we detect illegal state transition, likely due to ill history sequence or nondeterministic decider code
panic(fmt.Sprintf("invalid state transition: attempt to %v, %v", event, d))
panicIllegalState(fmt.Sprintf("invalid state transition: attempt to %v, %v", event, d))
}

func (d *decisionStateMachineBase) handleDecisionSent() {
Expand Down Expand Up @@ -660,7 +673,7 @@ func (h *decisionsHelper) getDecision(id decisionID) decisionStateMachine {
if !ok {
panicMsg := fmt.Sprintf("unknown decision %v, possible causes are nondeterministic workflow definition code"+
" or incompatible change in the workflow definition", id)
panic(panicMsg)
panicIllegalState(panicMsg)
}
// Move the last update decision state machine to the back of the list.
// Otherwise decisions (like timer cancellations) can end up out of order.
Expand All @@ -671,7 +684,7 @@ func (h *decisionsHelper) getDecision(id decisionID) decisionStateMachine {
func (h *decisionsHelper) addDecision(decision decisionStateMachine) {
if _, ok := h.decisions[decision.getID()]; ok {
panicMsg := fmt.Sprintf("adding duplicate decision %v", decision)
panic(panicMsg)
panicIllegalState(panicMsg)
}
element := h.orderedDecisions.PushBack(decision)
h.decisions[decision.getID()] = element
Expand Down Expand Up @@ -730,12 +743,12 @@ func (h *decisionsHelper) getActivityID(event *s.HistoryEvent) string {
case s.EventTypeActivityTaskTimedOut:
scheduledEventID = event.ActivityTaskTimedOutEventAttributes.GetScheduledEventId()
default:
panic(fmt.Sprintf("unexpected event type %v", event.GetEventType()))
panicIllegalState(fmt.Sprintf("unexpected event type %v", event.GetEventType()))
}

activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID]
if !ok {
panic(fmt.Sprintf("unable to find activity ID for the event %v", util.HistoryEventToString(event)))
panicIllegalState(fmt.Sprintf("unable to find activity ID for the event %v", util.HistoryEventToString(event)))
}
return activityID
}
Expand Down
49 changes: 30 additions & 19 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,14 @@ ProcessEvents:
}
}

// Non-deterministic error could happen in 2 different places:
// 1) the replay decisions does not match to history events. This is usually due to non backwards compatible code
// change to decider logic. For example, change calling one activity to a different activity.
// 2) the decision state machine is trying to make illegal state transition while replay a history event (like
// activity task completed), but the corresponding decider code that start the event has been removed. In that case
// the replay of that event will panic on the decision state machine and the workflow will be marked as completed
// with the panic error.
var nonDeterministicErr error
if !skipReplayCheck && !w.isWorkflowCompleted {
// check if decisions from reply matches to the history events
if err := matchReplayWithHistory(replayDecisions, respondEvents); err != nil {
Expand All @@ -728,29 +736,32 @@ ProcessEvents:
zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()),
zap.String(tagRunID, task.WorkflowExecution.GetRunId()),
zap.Error(err))

// Whether or not we store the error in workflowContext.err makes
// a significant difference, to the point that it affects client's observable
// behavior as far as handling non-deterministic workflows.
//
// If we store it in workflowContext.err, the decision task completion code
// will pick up the error and correctly wrap it in the response request we sent back
// to the server, which in this case will contain a request to fail the workflow.
//
// If we simply return the error, the decision task completion code path will not
// execute at all, therefore, no response is sent back to the server and we will
// look like a decision task time out.
switch w.wth.nonDeterministicWorkflowPolicy {
case NonDeterministicWorkflowPolicyFailWorkflow:
eventHandler.Complete(nil, NewCustomError("nondeterministic workflow", err.Error()))
case NonDeterministicWorkflowPolicyBlockWorkflow:
return nil, err
default:
panic(fmt.Sprintf("unknown mismatched workflow history policy."))
nonDeterministicErr = err
}
}
if nonDeterministicErr == nil && w.err != nil {
if panicErr, ok := w.err.(*PanicError); ok && panicErr.value != nil {
if _, isStateMachinePanic := panicErr.value.(stateMachineIllegalStatePanic); isStateMachinePanic {
nonDeterministicErr = panicErr
}
}
}

if nonDeterministicErr != nil {
switch w.wth.nonDeterministicWorkflowPolicy {
case NonDeterministicWorkflowPolicyFailWorkflow:
// complete workflow with custom error will fail the workflow
eventHandler.Complete(nil, NewCustomError("NonDeterministicWorkflowPolicyFailWorkflow", nonDeterministicErr.Error()))
case NonDeterministicWorkflowPolicyBlockWorkflow:
// return error here will be convert to DecisionTaskFailed for the first time, and ignored for subsequent
// attempts which will cause DecisionTaskTimeout and server will retry forever until issue got fixed or
// workflow timeout.
return nil, nonDeterministicErr
default:
panic(fmt.Sprintf("unknown mismatched workflow history policy."))
}
}

return w.CompleteDecisionTask(workflowTask, true), nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
t.True(len(response.Decisions) > 0)
closeDecision := response.Decisions[len(response.Decisions)-1]
t.Equal(*closeDecision.DecisionType, s.DecisionTypeFailWorkflowExecution)
t.Contains(*closeDecision.FailWorkflowExecutionDecisionAttributes.Reason, "nondeterministic")
t.Contains(*closeDecision.FailWorkflowExecutionDecisionAttributes.Reason, "NonDeterministicWorkflowPolicyFailWorkflow")

// now with different package name to activity type
testEvents[4].ActivityTaskScheduledEventAttributes.ActivityType.Name = common.StringPtr("new-package.Greeter_Activity")
Expand Down

0 comments on commit 2875316

Please sign in to comment.