Skip to content

Commit

Permalink
Fix localactivity should not retry on CancelError (#890)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Dec 2, 2019
1 parent 5a70ce8 commit 516360e
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 10 deletions.
6 changes: 6 additions & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ func NewCanceledError(details ...interface{}) *CanceledError {
return &CanceledError{details: ErrorDetailsValues(details)}
}

// IsCanceledError return whether error in CanceledError
func IsCanceledError(err error) bool {
_, ok := err.(*CanceledError)
return ok
}

// NewContinueAsNewError creates ContinueAsNewError instance
// If the workflow main function returns this error then the current execution is ended and
// the new execution with same workflow ID is started automatically with options
Expand Down
29 changes: 29 additions & 0 deletions internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,35 @@ func Test_CanceledError(t *testing.T) {
require.Equal(t, testErrorDetails3, b3)
}

func Test_IsCanceledError(t *testing.T) {

tests := []struct {
name string
err error
expected bool
}{
{
name: "empty detail",
err: NewCanceledError(),
expected: true,
},
{
name: "with detail",
err: NewCanceledError("details"),
expected: true,
},
{
name: "not canceled error",
err: errors.New("details"),
expected: false,
},
}

for _, test := range tests {
require.Equal(t, test.expected, IsCanceledError(test.err))
}
}

func TestErrorDetailsValues(t *testing.T) {
e := ErrorDetailsValues{}
require.Equal(t, ErrNoData, e.Get())
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ func (w *workflowExecutionContextImpl) ProcessLocalActivityResult(workflowTask *
}

func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResult) bool {
if lar.task.retryPolicy == nil || lar.err == nil || lar.err == ErrCanceled {
if lar.task.retryPolicy == nil || lar.err == nil || IsCanceledError(lar.err) {
return false
}

Expand Down
101 changes: 93 additions & 8 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (s *WorkflowTestSuiteUnitTest) SetupSuite() {
RegisterActivityWithOptions(testActivityHello, RegisterActivityOptions{Name: "testActivityHello"})
RegisterActivity(testActivityContext)
RegisterActivity(testActivityHeartbeat)
RegisterActivity(testActivityCanceled)
}

func TestUnitTestSuite(t *testing.T) {
Expand Down Expand Up @@ -494,6 +495,14 @@ func testActivityContext(ctx context.Context) (string, error) {
return "", fmt.Errorf("context did not propagate to workflow")
}

func testActivityCanceled(ctx context.Context) (int32, error) {
info := GetActivityInfo(ctx)
if info.Attempt < 2 {
return int32(-1), NewCanceledError("details")
}
return info.Attempt, nil
}

func testWorkflowHeartbeat(ctx Context, msg string, waitTime time.Duration) (string, error) {
ao := ActivityOptions{
ScheduleToStartTimeout: time.Minute,
Expand Down Expand Up @@ -2214,15 +2223,15 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityHeartbeatRetry() {

func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetry() {

localActivityFn := func(ctx context.Context) (string, error) {
localActivityFn := func(ctx context.Context) (int32, error) {
info := GetActivityInfo(ctx)
if info.Attempt < 2 {
return "", NewCustomError("bad-luck")
return int32(-1), NewCustomError("bad-luck")
}
return "retry-done", nil
return info.Attempt, nil
}

workflowFn := func(ctx Context) (string, error) {
workflowFn := func(ctx Context) (int32, error) {
lao := LocalActivityOptions{
ScheduleToCloseTimeout: time.Minute,
RetryPolicy: &RetryPolicy{
Expand All @@ -2236,10 +2245,10 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetry() {
}
ctx = WithLocalActivityOptions(ctx, lao)

var result string
var result int32
err := ExecuteLocalActivity(ctx, localActivityFn).Get(ctx, &result)
if err != nil {
return "", err
return int32(-1), err
}
return result, nil
}
Expand All @@ -2250,9 +2259,85 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetry() {

s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())
var result string
var result int32
s.NoError(env.GetWorkflowResult(&result))
s.Equal("retry-done", result)
s.Equal(int32(2), result)
}

func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetryOnCancel() {
attempts := 0
localActivityFn := func(ctx context.Context) (int32, error) {
attempts++
info := GetActivityInfo(ctx)
if info.Attempt < 2 {
return int32(-1), NewCanceledError("details")
}
return info.Attempt, nil
}

workflowFn := func(ctx Context) (int32, error) {
lao := LocalActivityOptions{
ScheduleToCloseTimeout: time.Minute,
RetryPolicy: &RetryPolicy{
MaximumAttempts: 3,
InitialInterval: time.Second,
MaximumInterval: time.Second * 10,
BackoffCoefficient: 2,
NonRetriableErrorReasons: []string{"bad-bug"},
ExpirationInterval: time.Minute,
},
}
ctx = WithLocalActivityOptions(ctx, lao)

var result int32
err := ExecuteLocalActivity(ctx, localActivityFn).Get(ctx, &result)
if err != nil {
return int32(-1), err
}
return result, nil
}

env := s.NewTestWorkflowEnvironment()
RegisterWorkflow(workflowFn)
env.ExecuteWorkflow(workflowFn)

s.True(env.IsWorkflowCompleted())
s.Error(env.GetWorkflowError())
s.True(IsCanceledError(env.GetWorkflowError()))
s.Equal(1, attempts)
}

func (s *WorkflowTestSuiteUnitTest) Test_ActivityRetryOnCancel() {
workflowFn := func(ctx Context) (int32, error) {
ao := ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
RetryPolicy: &RetryPolicy{
MaximumAttempts: 3,
InitialInterval: time.Second,
MaximumInterval: time.Second * 10,
BackoffCoefficient: 2,
NonRetriableErrorReasons: []string{"bad-bug"},
ExpirationInterval: time.Minute,
},
}
ctx = WithActivityOptions(ctx, ao)

var result int32
err := ExecuteActivity(ctx, testActivityCanceled).Get(ctx, &result)
if err != nil {
return int32(-1), err
}
return result, nil
}

env := s.NewTestWorkflowEnvironment()
RegisterWorkflow(workflowFn)
env.ExecuteWorkflow(workflowFn)

s.True(env.IsWorkflowCompleted())
s.Error(env.GetWorkflowError())
s.True(IsCanceledError(env.GetWorkflowError()))
}

func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowRetry() {
Expand Down
2 changes: 1 addition & 1 deletion internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func scheduleLocalActivity(ctx Context, params *executeLocalActivityParams) Futu
ctxDone.removeReceiveCallback(cancellationCallback)
}

if lar.err == nil || lar.backoff <= 0 {
if lar.err == nil || IsCanceledError(lar.err) || lar.backoff <= 0 {
f.Set(lar.result, lar.err)
return
}
Expand Down

0 comments on commit 516360e

Please sign in to comment.